Skip to content

Commit

Permalink
DRILL-7155: Create a standard logging message for batch sizes generat…
Browse files Browse the repository at this point in the history
…ed by individual operators.

This is needed for QA verification of the Batch Size feature DRILL-6238.
closes #1716
  • Loading branch information
Robert Hou authored and sohami committed Apr 5, 2019
1 parent e477480 commit ec70dec
Show file tree
Hide file tree
Showing 11 changed files with 33 additions and 17 deletions.
Expand Up @@ -208,8 +208,7 @@ public HashAggBatch(HashAggregate popConfig, RecordBatch incoming, FragmentConte

hashAggMemoryManager = new HashAggMemoryManager(configuredBatchSize);

RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(),
"configured output batch size: %d", configuredBatchSize);
RecordBatchStats.printConfiguredBatchSize(getRecordBatchStatsContext(), configuredBatchSize);

columnMapping = CaseInsensitiveMap.newHashMap();
}
Expand Down
Expand Up @@ -172,8 +172,8 @@ public FlattenRecordBatch(FlattenPOP pop, RecordBatch incoming, FragmentContext
int configuredBatchSize = (int) context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR);
flattenMemoryManager = new FlattenMemoryManager(configuredBatchSize);

RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(),
"configured output batch size: %d", configuredBatchSize);
RecordBatchStats.printConfiguredBatchSize(getRecordBatchStatsContext(),
configuredBatchSize);
}

@Override
Expand Down
Expand Up @@ -1261,8 +1261,8 @@ public HashJoinBatch(HashJoinPOP popConfig, FragmentContext context,
batchMemoryManager = new JoinBatchMemoryManager(outputBatchSize, left, right, new HashSet<>());


RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(),
"configured output batch size: %d", configuredBatchSize);
RecordBatchStats.printConfiguredBatchSize(getRecordBatchStatsContext(),
configuredBatchSize);

enableRuntimeFilter = context.getOptions().getOption(ExecConstants.HASHJOIN_ENABLE_RUNTIME_FILTER) && popConfig.getRuntimeFilterDef() != null;
}
Expand Down
Expand Up @@ -115,6 +115,7 @@ public LateralJoinBatch(LateralJoinPOP popConfig, FragmentContext context,
Preconditions.checkNotNull(left);
Preconditions.checkNotNull(right);
final int configOutputBatchSize = (int) context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR);
RecordBatchStats.printConfiguredBatchSize(getRecordBatchStatsContext(), configOutputBatchSize);
implicitColumn = popConfig.getImplicitRIDColumn();

populateExcludedField(popConfig);
Expand Down
Expand Up @@ -138,8 +138,8 @@ protected MergeJoinBatch(MergeJoinPOP popConfig, FragmentContext context, Record
final int configuredBatchSize = (int) context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR);
batchMemoryManager = new MergeJoinMemoryManager(configuredBatchSize, left, right);

RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(),
"configured output batch size: %d", configuredBatchSize);
RecordBatchStats.printConfiguredBatchSize(getRecordBatchStatsContext(),
configuredBatchSize);

if (popConfig.getConditions().size() == 0) {
throw new UnsupportedOperationException("Merge Join currently does not support cartesian join. This join operator was configured with 0 conditions");
Expand Down
Expand Up @@ -135,8 +135,8 @@ protected NestedLoopJoinBatch(NestedLoopJoinPOP popConfig, FragmentContext conte
int configuredBatchSize = (int) context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR);
batchMemoryManager = new JoinBatchMemoryManager(configuredBatchSize, left, right, new HashSet<>());

RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(),
"configured output batch size: %d", configuredBatchSize);
RecordBatchStats.printConfiguredBatchSize(getRecordBatchStatsContext(),
configuredBatchSize);
}

/**
Expand Down
Expand Up @@ -255,8 +255,8 @@ public void init(RecordBatch incomingBatch, ProjectRecordBatch outgoingBatch) {
setOutgoingBatch(outgoingBatch);
reset();

RecordBatchStats.logRecordBatchStats(outgoingBatch.getRecordBatchStatsContext(),
"configuredOutputSize: %d", getOutputBatchSize());
RecordBatchStats.printConfiguredBatchSize(outgoingBatch.getRecordBatchStatsContext(),
getOutputBatchSize());
}

private void reset() {
Expand Down
Expand Up @@ -78,8 +78,8 @@ public UnionAllRecordBatch(UnionAll config, List<RecordBatch> children, Fragment
int configuredBatchSize = (int) context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR);
batchMemoryManager = new RecordBatchMemoryManager(numInputs, configuredBatchSize);

RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(),
"configured output batch size: %d", configuredBatchSize);
RecordBatchStats.printConfiguredBatchSize(getRecordBatchStatsContext(),
configuredBatchSize);
}

@Override
Expand Down
Expand Up @@ -136,6 +136,7 @@ public UnnestRecordBatch(UnnestPOP pop, FragmentContext context) throws OutOfMem
pop.addUnnestBatch(this);
// get the output batch size from config.
int configuredBatchSize = (int) context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR);
RecordBatchStats.printConfiguredBatchSize(getRecordBatchStatsContext(), configuredBatchSize);
memoryManager = new UnnestMemoryManager(configuredBatchSize);
rowIdColumnName = pop.getImplicitColumn();
}
Expand Down
Expand Up @@ -336,8 +336,7 @@ private long normalizeMemorySizePerBatch() {
}

if (batchStatsContext.isEnableBatchSzLogging()) {
final String message = String.format("The Parquet reader batch memory has been set to [%d] byte(s)", normalizedMemorySize);
RecordBatchStats.logRecordBatchStats(message, batchStatsContext);
RecordBatchStats.printConfiguredBatchSize(batchStatsContext, (int) normalizedMemorySize);
}

return normalizedMemorySize;
Expand Down
Expand Up @@ -271,6 +271,22 @@ public static String printAllocatorStats(BufferAllocator allocator) {
return msg.toString();
}

/**
* Prints the configured batch size
*
* @param batchStatsContext batch stats context object
* @param batchSize contains the configured batch size
*/
public static void printConfiguredBatchSize(RecordBatchStatsContext batchStatsContext,
int batchSize) {

if (!batchStatsContext.isEnableBatchSzLogging()) {
return; // NOOP
}
final String message = String.format("The batch memory has been set to [%d] byte(s)", batchSize);
logRecordBatchStats(message, batchStatsContext);
}

// ----------------------------------------------------------------------------
// Local Implementation
// ----------------------------------------------------------------------------
Expand Down Expand Up @@ -371,4 +387,4 @@ private static String toString(RecordBatchIOType ioType) {

}

}
}

0 comments on commit ec70dec

Please sign in to comment.