Skip to content

Commit

Permalink
DRILL-6709: Extended the batch stats utility to other operators
Browse files Browse the repository at this point in the history
closes #1444
  • Loading branch information
sachouche authored and Ben-Zvi committed Sep 8, 2018
1 parent fa0d78d commit 85ebae5
Show file tree
Hide file tree
Showing 16 changed files with 300 additions and 174 deletions.
Expand Up @@ -48,6 +48,7 @@
import org.apache.drill.exec.testing.ControlsInjectorFactory;
import org.apache.drill.exec.util.CallBack;
import org.apache.drill.exec.util.record.RecordBatchStats;
import org.apache.drill.exec.util.record.RecordBatchStats.RecordBatchIOType;
import org.apache.drill.exec.util.record.RecordBatchStats.RecordBatchStatsContext;
import org.apache.drill.exec.vector.AllocationHelper;
import org.apache.drill.exec.vector.NullableVarCharVector;
Expand Down Expand Up @@ -302,7 +303,7 @@ private void logRecordBatchStats() {
return; // NOOP
}

RecordBatchStats.logRecordBatchStats(getFQNForLogging(MAX_FQN_LENGTH), this, batchStatsContext);
RecordBatchStats.logRecordBatchStats(RecordBatchIOType.OUTPUT, getFQNForLogging(MAX_FQN_LENGTH), this, batchStatsContext);
}

/** Might truncate the FQN if too long */
Expand Down
Expand Up @@ -60,6 +60,8 @@
import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.record.selection.SelectionVector2;
import org.apache.drill.exec.record.selection.SelectionVector4;
import org.apache.drill.exec.util.record.RecordBatchStats;
import org.apache.drill.exec.util.record.RecordBatchStats.RecordBatchIOType;
import org.apache.drill.exec.vector.AllocationHelper;
import org.apache.drill.exec.vector.FixedWidthVector;
import org.apache.drill.exec.vector.ValueVector;
Expand Down Expand Up @@ -159,9 +161,7 @@ public void update(RecordBatch incomingRecordBatch) {
}

updateIncomingStats();
if (logger.isDebugEnabled()) {
logger.debug("BATCH_STATS, incoming: {}", getRecordBatchSizer());
}
RecordBatchStats.logRecordBatchStats(RecordBatchIOType.INPUT, getRecordBatchSizer(), getRecordBatchStatsContext());
}
}

Expand Down Expand Up @@ -204,7 +204,9 @@ public HashAggBatch(HashAggregate popConfig, RecordBatch incoming, FragmentConte
}

hashAggMemoryManager = new HashAggMemoryManager(configuredBatchSize);
logger.debug("BATCH_STATS, configured output batch size: {}", configuredBatchSize);

RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(),
"configured output batch size: %d", configuredBatchSize);

columnMapping = CaseInsensitiveMap.newHashMap();
}
Expand Down Expand Up @@ -474,15 +476,15 @@ private void updateStats() {
stats.setLongStat(HashAggTemplate.Metric.AVG_OUTPUT_ROW_BYTES, hashAggMemoryManager.getAvgOutputRowWidth());
stats.setLongStat(HashAggTemplate.Metric.OUTPUT_RECORD_COUNT, hashAggMemoryManager.getTotalOutputRecords());

if (logger.isDebugEnabled()) {
logger.debug("BATCH_STATS, incoming aggregate: count : {}, avg bytes : {}, avg row bytes : {}, record count : {}",
hashAggMemoryManager.getNumIncomingBatches(), hashAggMemoryManager.getAvgInputBatchSize(),
hashAggMemoryManager.getAvgInputRowWidth(), hashAggMemoryManager.getTotalInputRecords());
RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(),
"incoming aggregate: count : %d, avg bytes : %d, avg row bytes : %d, record count : %d",
hashAggMemoryManager.getNumIncomingBatches(), hashAggMemoryManager.getAvgInputBatchSize(),
hashAggMemoryManager.getAvgInputRowWidth(), hashAggMemoryManager.getTotalInputRecords());

logger.debug("BATCH_STATS, outgoing aggregate: count : {}, avg bytes : {}, avg row bytes : {}, record count : {}",
hashAggMemoryManager.getNumOutgoingBatches(), hashAggMemoryManager.getAvgOutputBatchSize(),
hashAggMemoryManager.getAvgOutputRowWidth(), hashAggMemoryManager.getTotalOutputRecords());
}
RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(),
"outgoing aggregate: count : %d, avg bytes : %d, avg row bytes : %d, record count : %d",
hashAggMemoryManager.getNumOutgoingBatches(), hashAggMemoryManager.getAvgOutputBatchSize(),
hashAggMemoryManager.getAvgOutputRowWidth(), hashAggMemoryManager.getTotalOutputRecords());
}
@Override
public void close() {
Expand Down
Expand Up @@ -70,7 +70,8 @@
import org.apache.drill.exec.record.RecordBatch.IterOutcome;
import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.record.WritableBatch;

import org.apache.drill.exec.util.record.RecordBatchStats;
import org.apache.drill.exec.util.record.RecordBatchStats.RecordBatchIOType;
import org.apache.drill.exec.vector.AllocationHelper;

import org.apache.drill.exec.vector.FixedWidthVector;
Expand Down Expand Up @@ -1223,10 +1224,7 @@ public AggIterOutcome outputCurrentBatch() {
}

outgoing.getRecordBatchMemoryManager().updateOutgoingStats(numOutputRecords);

if (logger.isDebugEnabled()) {
logger.debug("BATCH_STATS, outgoing: {}", new RecordBatchSizer(outgoing));
}
RecordBatchStats.logRecordBatchStats(RecordBatchIOType.OUTPUT, outgoing, outgoing.getRecordBatchStatsContext());

this.outcome = IterOutcome.OK;

Expand Down
Expand Up @@ -52,6 +52,8 @@
import org.apache.drill.exec.record.TypedFieldId;
import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.util.record.RecordBatchStats;
import org.apache.drill.exec.util.record.RecordBatchStats.RecordBatchIOType;
import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.exec.vector.complex.RepeatedMapVector;
import org.apache.drill.exec.vector.complex.RepeatedValueVector;
Expand Down Expand Up @@ -157,7 +159,7 @@ public void update() {
// i.e. all rows fit within memory budget.
setOutputRowCount(Math.min(columnSize.getElementCount(), getOutputRowCount()));

logger.debug("BATCH_STATS, incoming: {}", getRecordBatchSizer());
RecordBatchStats.logRecordBatchStats(RecordBatchIOType.INPUT, getRecordBatchSizer(), getRecordBatchStatsContext());

updateIncomingStats();
}
Expand All @@ -170,7 +172,8 @@ public FlattenRecordBatch(FlattenPOP pop, RecordBatch incoming, FragmentContext
int configuredBatchSize = (int) context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR);
flattenMemoryManager = new FlattenMemoryManager(configuredBatchSize);

logger.debug("BATCH_STATS, configured output batch size: {}", configuredBatchSize);
RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(),
"configured output batch size: %d", configuredBatchSize);
}

@Override
Expand Down Expand Up @@ -261,10 +264,7 @@ protected IterOutcome doWork() {
}

flattenMemoryManager.updateOutgoingStats(outputRecords);

if (logger.isDebugEnabled()) {
logger.debug("BATCH_STATS, outgoing: {}", new RecordBatchSizer(this));
}
RecordBatchStats.logRecordBatchStats(RecordBatchIOType.OUTPUT, this, getRecordBatchStatsContext());

// Get the final outcome based on hasRemainder since that will determine if all the incoming records were
// consumed in current output batch or not
Expand Down Expand Up @@ -516,15 +516,15 @@ private void updateStats() {
stats.setLongStat(Metric.AVG_OUTPUT_ROW_BYTES, flattenMemoryManager.getAvgOutputRowWidth());
stats.setLongStat(Metric.OUTPUT_RECORD_COUNT, flattenMemoryManager.getTotalOutputRecords());

if (logger.isDebugEnabled()) {
logger.debug("BATCH_STATS, incoming aggregate: count : {}, avg bytes : {}, avg row bytes : {}, record count : {}",
flattenMemoryManager.getNumIncomingBatches(), flattenMemoryManager.getAvgInputBatchSize(),
flattenMemoryManager.getAvgInputRowWidth(), flattenMemoryManager.getTotalInputRecords());
RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(),
"incoming aggregate: count : %d, avg bytes : %d, avg row bytes : %d, record count : %d",
flattenMemoryManager.getNumIncomingBatches(), flattenMemoryManager.getAvgInputBatchSize(),
flattenMemoryManager.getAvgInputRowWidth(), flattenMemoryManager.getTotalInputRecords());

logger.debug("BATCH_STATS, outgoing aggregate: count : {}, avg bytes : {}, avg row bytes : {}, record count : {}",
flattenMemoryManager.getNumOutgoingBatches(), flattenMemoryManager.getAvgOutputBatchSize(),
flattenMemoryManager.getAvgOutputRowWidth(), flattenMemoryManager.getTotalOutputRecords());
}
RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(),
"outgoing aggregate: count : %d, avg bytes : %d, avg row bytes : %d, record count : %d",
flattenMemoryManager.getNumOutgoingBatches(), flattenMemoryManager.getAvgOutputBatchSize(),
flattenMemoryManager.getAvgOutputRowWidth(), flattenMemoryManager.getTotalOutputRecords());
}

@Override
Expand Down
Expand Up @@ -68,9 +68,10 @@
import org.apache.drill.exec.record.JoinBatchMemoryManager;
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.RecordBatchSizer;
import org.apache.drill.exec.record.TypedFieldId;
import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.util.record.RecordBatchStats;
import org.apache.drill.exec.util.record.RecordBatchStats.RecordBatchIOType;
import org.apache.drill.exec.vector.IntVector;
import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.exec.vector.complex.AbstractContainerVector;
Expand Down Expand Up @@ -291,7 +292,7 @@ private void prefetchFirstBuildBatch() {
buildBatch,
() -> {
batchMemoryManager.update(RIGHT_INDEX, 0, true);
logger.debug("BATCH_STATS, incoming right: {}", batchMemoryManager.getRecordBatchSizer(RIGHT_INDEX));
RecordBatchStats.logRecordBatchStats(RecordBatchIOType.INPUT_RIGHT, batchMemoryManager.getRecordBatchSizer(RIGHT_INDEX), getRecordBatchStatsContext());
});
}

Expand All @@ -306,7 +307,7 @@ private void prefetchFirstProbeBatch() {
probeBatch,
() -> {
batchMemoryManager.update(LEFT_INDEX, 0);
logger.debug("BATCH_STATS, incoming left: {}", batchMemoryManager.getRecordBatchSizer(LEFT_INDEX));
RecordBatchStats.logRecordBatchStats(RecordBatchIOType.INPUT_LEFT, batchMemoryManager.getRecordBatchSizer(LEFT_INDEX), getRecordBatchStatsContext());
});
}

Expand Down Expand Up @@ -488,9 +489,7 @@ public IterOutcome innerNext() {
container.setRecordCount(outputRecords);

batchMemoryManager.updateOutgoingStats(outputRecords);
if (logger.isDebugEnabled()) {
logger.debug("BATCH_STATS, outgoing: {}", new RecordBatchSizer(this));
}
RecordBatchStats.logRecordBatchStats(RecordBatchIOType.OUTPUT, this, getRecordBatchStatsContext());

/* We are here because of one the following
* 1. Completed processing of all the records and we are done
Expand Down Expand Up @@ -1121,12 +1120,17 @@ public HashJoinBatch(HashJoinPOP popConfig, FragmentContext context,
final int configuredBatchSize = (int) context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR);
final double avail_mem_factor = (double) context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_AVAIL_MEM_FACTOR_VALIDATOR);
int outputBatchSize = Math.min(configuredBatchSize, Integer.highestOneBit((int)(allocator.getLimit() * avail_mem_factor)));
logger.debug("BATCH_STATS, configured output batch size: {}, allocated memory {}, avail mem factor {}, output batch size: {}",
configuredBatchSize, allocator.getLimit(), avail_mem_factor, outputBatchSize);

RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(),
"configured output batch size: %d, allocated memory %d, avail mem factor %f, output batch size: %d",
configuredBatchSize, allocator.getLimit(), avail_mem_factor, outputBatchSize);

batchMemoryManager = new JoinBatchMemoryManager(outputBatchSize, left, right, new HashSet<>());
logger.debug("BATCH_STATS, configured output batch size: {}", configuredBatchSize);


RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(),
"configured output batch size: %d", configuredBatchSize);

enableRuntimeFilter = context.getOptions().getOption(ExecConstants.HASHJOIN_ENABLE_RUNTIME_FILTER);
}

Expand Down Expand Up @@ -1242,19 +1246,20 @@ public void close() {

updateMetrics();

if (logger.isDebugEnabled()) {
logger.debug("BATCH_STATS, incoming aggregate left: batch count : {}, avg bytes : {}, avg row bytes : {}, record count : {}",
batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.LEFT_INDEX), batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.LEFT_INDEX),
batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.LEFT_INDEX), batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.LEFT_INDEX));
RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(),
"incoming aggregate left: batch count : %d, avg bytes : %d, avg row bytes : %d, record count : %d",
batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.LEFT_INDEX), batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.LEFT_INDEX),
batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.LEFT_INDEX), batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.LEFT_INDEX));

logger.debug("BATCH_STATS, incoming aggregate right: batch count : {}, avg bytes : {}, avg row bytes : {}, record count : {}",
batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.RIGHT_INDEX), batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.RIGHT_INDEX),
batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.RIGHT_INDEX), batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.RIGHT_INDEX));
RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(),
"incoming aggregate right: batch count : %d, avg bytes : %d, avg row bytes : %d, record count : %d",
batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.RIGHT_INDEX), batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.RIGHT_INDEX),
batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.RIGHT_INDEX), batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.RIGHT_INDEX));

logger.debug("BATCH_STATS, outgoing aggregate: batch count : {}, avg bytes : {}, avg row bytes : {}, record count : {}",
batchMemoryManager.getNumOutgoingBatches(), batchMemoryManager.getAvgOutputBatchSize(),
batchMemoryManager.getAvgOutputRowWidth(), batchMemoryManager.getTotalOutputRecords());
}
RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(),
"outgoing aggregate: batch count : %d, avg bytes : %d, avg row bytes : %d, record count : %d",
batchMemoryManager.getNumOutgoingBatches(), batchMemoryManager.getAvgOutputBatchSize(),
batchMemoryManager.getAvgOutputRowWidth(), batchMemoryManager.getTotalOutputRecords());

this.cleanup();
super.close();
Expand Down
Expand Up @@ -39,6 +39,8 @@
import org.apache.drill.exec.record.SchemaBuilder;
import org.apache.drill.exec.record.VectorAccessibleUtilities;
import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.util.record.RecordBatchStats;
import org.apache.drill.exec.util.record.RecordBatchStats.RecordBatchIOType;
import org.apache.drill.exec.vector.IntVector;
import org.apache.drill.exec.vector.ValueVector;

Expand Down Expand Up @@ -240,25 +242,26 @@ public IterOutcome innerNext() {
public void close() {
updateBatchMemoryManagerStats();

if (logger.isDebugEnabled()) {
logger.debug("BATCH_STATS, incoming aggregate left: batch count : {}, avg bytes : {}, avg row bytes : {}, " +
"record count : {}", batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.LEFT_INDEX),
batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.LEFT_INDEX),
batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.LEFT_INDEX),
batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.LEFT_INDEX));

logger.debug("BATCH_STATS, incoming aggregate right: batch count : {}, avg bytes : {}, avg row bytes : {}, " +
"record count : {}", batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.RIGHT_INDEX),
batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.RIGHT_INDEX),
batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.RIGHT_INDEX),
batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.RIGHT_INDEX));

logger.debug("BATCH_STATS, outgoing aggregate: batch count : {}, avg bytes : {}, avg row bytes : {}, " +
"record count : {}", batchMemoryManager.getNumOutgoingBatches(),
batchMemoryManager.getAvgOutputBatchSize(),
batchMemoryManager.getAvgOutputRowWidth(),
batchMemoryManager.getTotalOutputRecords());
}
RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(),
"incoming aggregate left: batch count : %d, avg bytes : %d, avg row bytes : %d, " +
"record count : %d", batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.LEFT_INDEX),
batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.LEFT_INDEX),
batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.LEFT_INDEX),
batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.LEFT_INDEX));

RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(),
"incoming aggregate right: batch count : %d, avg bytes : %d, avg row bytes : %d, " +
"record count : %d", batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.RIGHT_INDEX),
batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.RIGHT_INDEX),
batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.RIGHT_INDEX),
batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.RIGHT_INDEX));

RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(),
"outgoing aggregate: batch count : %d, avg bytes : %d, avg row bytes : %d, " +
"record count : %d", batchMemoryManager.getNumOutgoingBatches(),
batchMemoryManager.getAvgOutputBatchSize(),
batchMemoryManager.getAvgOutputRowWidth(),
batchMemoryManager.getTotalOutputRecords());

super.close();
}
Expand Down Expand Up @@ -733,11 +736,10 @@ private void finalizeOutputContainer() {

batchMemoryManager.updateOutgoingStats(outputIndex);

if (logger.isDebugEnabled()) {
logger.debug("BATCH_STATS, outgoing:\n {}", new RecordBatchSizer(this));
logger.debug("Number of records emitted: {} and Allocator Stats: [AllocatedMem: {}, PeakMem: {}]",
RecordBatchStats.logRecordBatchStats(RecordBatchIOType.OUTPUT, this, getRecordBatchStatsContext());
RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(),
"Number of records emitted: %d and Allocator Stats: [AllocatedMem: %d, PeakMem: %d]",
outputIndex, container.getAllocator().getAllocatedMemory(), container.getAllocator().getPeakMemoryAllocation());
}

// Update the output index for next output batch to zero
outputIndex = 0;
Expand Down Expand Up @@ -1182,10 +1184,11 @@ private void updateMemoryManager(int inputIndex) {
// a new output batch with new incoming then it will not cause any problem since outputIndex will be 0
final int newOutputRowCount = batchMemoryManager.update(inputIndex, outputIndex);

if (logger.isDebugEnabled()) {
logger.debug("BATCH_STATS, incoming {}:\n {}", inputIndex == LEFT_INDEX ? "left" : "right",
batchMemoryManager.getRecordBatchSizer(inputIndex));
logger.debug("Previous OutputRowCount: {}, New OutputRowCount: {}", maxOutputRowCount, newOutputRowCount);
if (isRecordBatchStatsLoggingEnabled()) {
RecordBatchIOType type = inputIndex == LEFT_INDEX ? RecordBatchIOType.INPUT_LEFT : RecordBatchIOType.INPUT_RIGHT;
RecordBatchStats.logRecordBatchStats(type, batchMemoryManager.getRecordBatchSizer(inputIndex), getRecordBatchStatsContext());
RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(),
"Previous OutputRowCount: %d, New OutputRowCount: %d", maxOutputRowCount, newOutputRowCount);
}

if (useMemoryManager) {
Expand Down

0 comments on commit 85ebae5

Please sign in to comment.