Skip to content
Permalink
Browse files
DRILL-6560: Enhanced the batch statistics logging enablement
closes #1355
  • Loading branch information
sachouche authored and sohami committed Jul 12, 2018
1 parent cfe61eb commit 80fb761f2cef0dc78c573081aaf83e52b32b46fc
Showing 10 changed files with 186 additions and 107 deletions.
@@ -711,5 +711,8 @@ public static String bootDefaultFor(String name) {
public static final String STATS_LOGGING_FG_BATCH_SIZE_OPTION = "drill.exec.stats.logging.fine_grained.batch_size";
public static final BooleanValidator STATS_LOGGING_BATCH_FG_SIZE_VALIDATOR = new BooleanValidator(STATS_LOGGING_FG_BATCH_SIZE_OPTION);

/** Controls the list of operators for which batch sizing stats should be enabled */
public static final String STATS_LOGGING_BATCH_OPERATOR_OPTION = "drill.exec.stats.logging.enabled_operators";
public static final StringValidator STATS_LOGGING_BATCH_OPERATOR_VALIDATOR = new StringValidator(STATS_LOGGING_BATCH_OPERATOR_OPTION);

}
@@ -82,7 +82,7 @@ public class ScanBatch implements CloseableRecordBatch {
private final BufferAllocator allocator;
private final List<Map<String, String>> implicitColumnList;
private String currentReaderClassName;
private final RecordBatchStatsContext batchStatsLogging;
private final RecordBatchStatsContext batchStatsContext;

/**
*
@@ -121,7 +121,7 @@ public ScanBatch(FragmentContext context,
this.implicitColumnList = implicitColumnList;
addImplicitVectors();
currentReader = null;
batchStatsLogging = new RecordBatchStatsContext(context, oContext);
batchStatsContext = new RecordBatchStatsContext(context, oContext);
} finally {
oContext.getStats().stopProcessing();
}
@@ -304,12 +304,7 @@ private void logRecordBatchStats() {
return; // NOOP
}

RecordBatchStats.logRecordBatchStats(
batchStatsLogging.getContextOperatorId(),
getFQNForLogging(MAX_FQN_LENGTH),
this,
batchStatsLogging,
logger);
RecordBatchStats.logRecordBatchStats(getFQNForLogging(MAX_FQN_LENGTH), this, batchStatsContext);
}

/** Might truncate the FQN if too long */
@@ -46,8 +46,8 @@

/**
* <p> {@link OptionManager} that holds options within {@link org.apache.drill.exec.server.DrillbitContext}.
* Only one instance of this class exists per drillbit. Options set at the system level affect the entire system and
* persist between restarts.
* Only one instance of this class exists per drillbit. Options set at the system level affect the entire system and
* persist between restarts.
* </p>
*
* <p> All the system options are externalized into conf file. While adding a new system option
@@ -235,6 +235,7 @@ public static CaseInsensitiveMap<OptionDefinition> createDefaultOptionDefinition
new OptionDefinition(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR, new OptionMetaData(OptionValue.AccessibleScopes.SYSTEM, true, false)),
new OptionDefinition(ExecConstants.STATS_LOGGING_BATCH_SIZE_VALIDATOR, new OptionMetaData(OptionValue.AccessibleScopes.SYSTEM_AND_SESSION, true, true)),
new OptionDefinition(ExecConstants.STATS_LOGGING_BATCH_FG_SIZE_VALIDATOR,new OptionMetaData(OptionValue.AccessibleScopes.SYSTEM_AND_SESSION, true, true)),
new OptionDefinition(ExecConstants.STATS_LOGGING_BATCH_OPERATOR_VALIDATOR,new OptionMetaData(OptionValue.AccessibleScopes.SYSTEM_AND_SESSION, true, true)),
new OptionDefinition(ExecConstants.OUTPUT_BATCH_SIZE_AVAIL_MEM_FACTOR_VALIDATOR, new OptionMetaData(OptionValue.AccessibleScopes.SYSTEM, true, false)),
new OptionDefinition(ExecConstants.FRAG_RUNNER_RPC_TIMEOUT_VALIDATOR, new OptionMetaData(OptionValue.AccessibleScopes.SYSTEM, true, true)),
};
@@ -35,6 +35,7 @@
import org.apache.drill.exec.store.parquet.ParquetReaderStats;
import org.apache.drill.exec.store.parquet.ParquetReaderUtility;
import org.apache.drill.exec.store.parquet.columnreaders.batchsizing.RecordBatchSizerManager;
import org.apache.drill.exec.util.record.RecordBatchStats.RecordBatchStatsContext;
import org.apache.drill.exec.vector.ValueVector;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -240,7 +241,7 @@ public boolean useBulkReader() {
public void setup(OperatorContext operatorContext, OutputMutator output) throws ExecutionSetupException {
this.operatorContext = operatorContext;
schema = new ParquetSchema(fragmentContext.getOptions(), rowGroupIndex, footer, isStarQuery() ? null : getColumns());
batchSizerMgr = new RecordBatchSizerManager(fragmentContext.getOptions(), schema, numRecordsToRead);
batchSizerMgr = new RecordBatchSizerManager(fragmentContext.getOptions(), schema, numRecordsToRead, new RecordBatchStatsContext(fragmentContext, operatorContext));

logger.debug("Reading row group({}) with {} records in file {}.", rowGroupIndex, footer.getBlocks().get(rowGroupIndex).getRowCount(),
hadoopPath.toUri().getPath());
@@ -34,11 +34,11 @@
import org.apache.drill.exec.store.parquet.columnreaders.batchsizing.RecordBatchSizerManager.FieldOverflowState;
import org.apache.drill.exec.store.parquet.columnreaders.batchsizing.RecordBatchSizerManager.FieldOverflowStateContainer;
import org.apache.drill.exec.store.parquet.columnreaders.batchsizing.RecordBatchSizerManager.VarLenColumnBatchStats;
import org.apache.drill.exec.util.record.RecordBatchStats;
import org.apache.drill.exec.vector.ValueVector;

/** Class which handles reading a batch of rows from a set of variable columns */
public class VarLenBinaryReader {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(VarLenBinaryReader.class);

final ParquetRecordReader parentReader;
final RecordBatchSizerManager batchSizer;
@@ -170,7 +170,8 @@ private void handleColumnOverflow(List<VarLenColumnBatchStats> columnStats, int

// Lazy initialization
if (builder == null) {
builder = RecordBatchOverflow.newBuilder(parentReader.getOperatorContext().getAllocator());
builder = RecordBatchOverflow.newBuilder(parentReader.getOperatorContext().getAllocator(),
batchSizer.getBatchStatsContext());
}

final int numOverflowValues = columnStat.numValuesRead - batchNumRecords;
@@ -181,7 +182,7 @@ private void handleColumnOverflow(List<VarLenColumnBatchStats> columnStats, int
// Register batch overflow data with the record batch sizer manager (if any)
if (builder != null) {
Map<String, FieldOverflowStateContainer> overflowContainerMap = parentReader.batchSizerMgr.getFieldOverflowMap();
Map<String, FieldOverflowDefinition> overflowDefMap = builder.build().getRecordOverflowDefinition().getFieldOverflowDefs();
Map<String, FieldOverflowDefinition> overflowDefMap = builder.build().getRecordOverflowDefinition().getFieldOverflowDefs();

for (Map.Entry<String, FieldOverflowDefinition> entry : overflowDefMap.entrySet()) {
FieldOverflowStateContainer overflowStateContainer = new FieldOverflowStateContainer(entry.getValue(), null);
@@ -197,9 +198,9 @@ private void reorderVLColumns() {
// Finally, re-order the variable length columns since an overflow occurred
Collections.sort(orderedColumns, comparator);

if (logger.isDebugEnabled()) {
boolean isFirstValue = true;
final StringBuilder msg = new StringBuilder(RecordBatchSizerManager.BATCH_STATS_PREFIX);
if (batchSizer.getBatchStatsContext().isEnableBatchSzLogging()) {
boolean isFirstValue = true;
final StringBuilder msg = new StringBuilder();
msg.append(": Dumping the variable length columns read order: ");

for (VLColumnContainer container : orderedColumns) {
@@ -212,7 +213,7 @@ private void reorderVLColumns() {
}
msg.append('.');

logger.debug(msg.toString());
RecordBatchStats.logRecordBatchStats(msg.toString(), batchSizer.getBatchStatsContext());
}
}

@@ -26,6 +26,8 @@
import org.apache.drill.exec.store.parquet.columnreaders.batchsizing.RecordBatchOverflow.FieldOverflowEntry;
import org.apache.drill.exec.store.parquet.columnreaders.batchsizing.RecordBatchOverflow.RecordOverflowContainer;
import org.apache.drill.exec.store.parquet.columnreaders.batchsizing.RecordBatchOverflow.RecordOverflowDefinition;
import org.apache.drill.exec.util.record.RecordBatchStats;
import org.apache.drill.exec.util.record.RecordBatchStats.RecordBatchStatsContext;
import org.apache.drill.exec.vector.UInt1Vector;
import org.apache.drill.exec.vector.UInt4Vector;

@@ -44,7 +46,6 @@
* </ul>
*/
final class OverflowSerDeUtil {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(OverflowSerDeUtil.class);

/**
* Serializes a collection of overflow fields into a memory buffer:
@@ -56,10 +57,12 @@ final class OverflowSerDeUtil {
*
* @param fieldOverflowEntries input collection of field overflow entries
* @param allocator buffer allocator
* @param batchStatsContext batch statistics context object
* @return record overflow container; null if the input buffer is empty
*/
static RecordOverflowContainer serialize(List<FieldOverflowEntry> fieldOverflowEntries,
BufferAllocator allocator) {
BufferAllocator allocator,
RecordBatchStatsContext batchStatsContext) {

if (fieldOverflowEntries == null || fieldOverflowEntries.isEmpty()) {
return null;
@@ -82,8 +85,9 @@ static RecordOverflowContainer serialize(List<FieldOverflowEntry> fieldOverflowE
// Allocate the required memory to serialize the overflow fields
final DrillBuf buffer = allocator.buffer(bufferLength);

if (logger.isDebugEnabled()) {
logger.debug(String.format("Allocated a buffer of length %d to handle overflow", bufferLength));
if (batchStatsContext.isEnableBatchSzLogging()) {
final String msg = String.format("Allocated a buffer of length [%d] to handle overflow", bufferLength);
RecordBatchStats.logRecordBatchStats(msg, batchStatsContext);
}

// Create the result object
@@ -24,6 +24,7 @@
import org.apache.drill.common.map.CaseInsensitiveMap;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.util.record.RecordBatchStats.RecordBatchStatsContext;
import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.exec.vector.VariableWidthVector;

@@ -39,10 +40,11 @@ public final class RecordBatchOverflow {

/**
* @param allocator buffer allocator
* @param batchStatsContext batch statistics context
* @return new builder object
*/
public static Builder newBuilder(BufferAllocator allocator) {
return new Builder(allocator);
public static Builder newBuilder(BufferAllocator allocator, RecordBatchStatsContext batchStatsContext) {
return new Builder(allocator, batchStatsContext);
}

/**
@@ -75,13 +77,17 @@ public static final class Builder {
private final List<FieldOverflowEntry> fieldOverflowEntries = new ArrayList<FieldOverflowEntry>();
/** Buffer allocator */
private final BufferAllocator allocator;
/** Batch statistics context */
private final RecordBatchStatsContext batchStatsContext;

/**
* Build class to construct a {@link RecordBatchOverflow} object.
* @param allocator buffer allocator
* @param batchStatsContext batch statistics context
*/
private Builder(BufferAllocator allocator) {
private Builder(BufferAllocator allocator, RecordBatchStatsContext batchStatsContext) {
this.allocator = allocator;
this.batchStatsContext = batchStatsContext;
}

/**
@@ -101,9 +107,8 @@ public void addFieldOverflow(ValueVector vector, int firstValueIdx, int numValue
* @return a new built {link BatchRecordOverflow} object instance
*/
public RecordBatchOverflow build() {
RecordOverflowContainer overflowContainer = OverflowSerDeUtil.serialize(fieldOverflowEntries, allocator);
RecordBatchOverflow result =
new RecordBatchOverflow(overflowContainer.recordOverflowDef, allocator);
RecordOverflowContainer overflowContainer = OverflowSerDeUtil.serialize(fieldOverflowEntries, allocator, batchStatsContext);
RecordBatchOverflow result = new RecordBatchOverflow(overflowContainer.recordOverflowDef, allocator);

return result;
}
@@ -30,6 +30,8 @@
import org.apache.drill.exec.store.parquet.columnreaders.ParquetSchema;
import org.apache.drill.exec.store.parquet.columnreaders.VarLenColumnBulkInput;
import org.apache.drill.exec.store.parquet.columnreaders.batchsizing.RecordBatchOverflow.FieldOverflowDefinition;
import org.apache.drill.exec.util.record.RecordBatchStats;
import org.apache.drill.exec.util.record.RecordBatchStats.RecordBatchStatsContext;
import org.apache.drill.exec.vector.AllocationHelper;
import org.apache.drill.exec.vector.ValueVector;

@@ -39,7 +41,7 @@
*/
public final class RecordBatchSizerManager {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RecordBatchSizerManager.class);
public static final String BATCH_STATS_PREFIX = "BATCH_STATS";


/** Minimum column memory size */
private static final int MIN_COLUMN_MEMORY_SZ = VarLenColumnBulkInput.getMinVLColumnMemorySize();
@@ -78,6 +80,9 @@ public final class RecordBatchSizerManager {
*/
private Map<String, FieldOverflowStateContainer> fieldOverflowMap = CaseInsensitiveMap.newHashMap();

/** For controlling batch statistics logging */
private final RecordBatchStatsContext batchStatsContext;

/**
* Constructor.
*
@@ -87,7 +92,8 @@ public final class RecordBatchSizerManager {
*/
public RecordBatchSizerManager(OptionManager options,
ParquetSchema schema,
long totalRecordsToRead) {
long totalRecordsToRead,
RecordBatchStatsContext batchStatsContext) {

this.schema = schema;
this.totalRecordsToRead = totalRecordsToRead;
@@ -97,6 +103,7 @@ public RecordBatchSizerManager(OptionManager options,
this.maxRecordsPerBatch = this.configRecordsPerBatch;
this.recordsPerBatch = this.configRecordsPerBatch;
this.overflowOptimizer = new BatchOverflowOptimizer(columnMemoryInfoMap);
this.batchStatsContext = batchStatsContext;
}

/**
@@ -130,6 +137,13 @@ public ParquetSchema getSchema() {
return schema;
}

/**
* @return batch statistics context
*/
public RecordBatchStatsContext getBatchStatsContext() {
return batchStatsContext;
}

/**
* Allocates value vectors for the current batch.
*
@@ -282,10 +296,9 @@ private int normalizeNumRecordsPerBatch() {
normalizedNumRecords = (int) totalRecordsToRead;
}

if (logger.isDebugEnabled()) {
final String message = String.format("%s: The Parquet reader number of record(s) has been set to [%d]",
BATCH_STATS_PREFIX, normalizedNumRecords);
logger.debug(message);
if (batchStatsContext.isEnableBatchSzLogging()) {
final String message = String.format("The Parquet reader number of record(s) has been set to [%d]", normalizedNumRecords);
RecordBatchStats.logRecordBatchStats(message, batchStatsContext);
}

return normalizedNumRecords;
@@ -319,10 +332,9 @@ private int normalizeMemorySizePerBatch() {
logger.warn(message);
}

if (logger.isDebugEnabled()) {
final String message = String.format("%s: The Parquet reader batch memory has been set to [%d] byte(s)",
BATCH_STATS_PREFIX, normalizedMemorySize);
logger.debug(message);
if (batchStatsContext.isEnableBatchSzLogging()) {
final String message = String.format("The Parquet reader batch memory has been set to [%d] byte(s)", normalizedMemorySize);
RecordBatchStats.logRecordBatchStats(message, batchStatsContext);
}

return normalizedMemorySize;
@@ -370,13 +382,12 @@ private void assignColumnsBatchMemory() {
assignFineGrainedMemoryQuota();

// log the new record batch if it changed
if (logger.isDebugEnabled()) {
if (batchStatsContext.isEnableBatchSzLogging()) {
assert recordsPerBatch <= maxRecordsPerBatch;

if (originalRecordsPerBatch != recordsPerBatch) {
final String message = String.format("%s: The Parquet records per batch [%d] has been decreased to [%d]",
BATCH_STATS_PREFIX, originalRecordsPerBatch, recordsPerBatch);
logger.debug(message);
final String message = String.format("The Parquet records per batch [%d] has been decreased to [%d]", originalRecordsPerBatch, recordsPerBatch);
RecordBatchStats.logRecordBatchStats(message, batchStatsContext);
}

// Now dump the per column memory quotas
@@ -504,12 +515,12 @@ private double computeNeededMemoryRatio(MemoryRequirementContainer requiredMemor
}

private void dumpColumnMemoryQuotas() {
StringBuilder msg = new StringBuilder(BATCH_STATS_PREFIX);
StringBuilder msg = new StringBuilder();
msg.append(": Field Quotas:\n\tName\tType\tPrec\tQuota\n");

for (ColumnMemoryInfo columnInfo : columnMemoryInfoMap.values()) {
msg.append("\t");
msg.append(BATCH_STATS_PREFIX);
msg.append(RecordBatchStats.BATCH_STATS_PREFIX);
msg.append("\t");
msg.append(columnInfo.columnMeta.getField().getName());
msg.append("\t");
@@ -521,7 +532,7 @@ private void dumpColumnMemoryQuotas() {
msg.append("\n");
}

logger.debug(msg.toString());
RecordBatchStats.logRecordBatchStats(msg.toString(), batchStatsContext);
}

private static void printType(MaterializedField field, StringBuilder msg) {

0 comments on commit 80fb761

Please sign in to comment.