Skip to content
Permalink
Browse files
DRILL-7450: Improve performance for ANALYZE command
- Implement two-phase aggregation for the lowest metadata aggregate to optimize performance
- Allow using complex functions with hash aggregate
- Use hash aggregation for PHASE_1of2 for ANALYZE to reduce memory usage and avoid sorting non-aggregated data
- Add sort above hash aggregation to fix correctness of merge exchange and stream aggregate

closes #1907
  • Loading branch information
vvysotskyi committed Dec 4, 2019
1 parent de41559 commit 20293b63c0bb559ae35d57f7cb1ab7fa24e9ee6d
Show file tree
Hide file tree
Showing 56 changed files with 2,375 additions and 685 deletions.
@@ -94,24 +94,63 @@ Analyze command specific operators:
- `MetadataControllerBatch` - responsible for converting obtained metadata, fetching absent metadata from the Metastore
and storing resulting metadata into the Metastore.

`MetastoreAnalyzeTableHandler` forms plan depending on segments count in the following form:
`MetastoreAnalyzeTableHandler` forms plan depending on segments count in the following form:

```
MetadataControllerBatch
MetadataControllerRel
...
MetadataHandlerBatch
MetadataAggBatch(dir0, ...)
MetadataHandlerBatch
MetadataAggBatch(dir0, dir1, ...)
MetadataHandlerBatch
MetadataAggBatch(dir0, dir1, fqn, ...)
Scan(DYNAMIC_STAR **, ANY fqn, ...)
MetadataHandlerRel
MetadataAggRel(dir0, ...)
MetadataHandlerRel
MetadataAggRel(dir0, dir1, ...)
MetadataHandlerRel
MetadataAggRel(dir0, dir1, fqn, ...)
DrillScanRel(DYNAMIC_STAR **, ANY fqn, ...)
```

The lowest `MetadataAggBatch` creates required aggregate calls for every (or interesting only) table columns
For the case when `ANALYZE` uses columns for which statistics is present in parquet metadata,
`ConvertMetadataAggregateToDirectScanRule` rule will be applied to the

```
MetadataAggRel(dir0, dir1, fqn, ...)
DrillScanRel(DYNAMIC_STAR **, ANY fqn, ...)
```

plan part and convert it to the `DrillDirectScanRel` populated with row group metadata for the case when `ANALYZE`
was done for `ROW_GROUP` metadata level.
For the case when metadata level in `ANALYZE` is not `ROW_GROUP`, the plan above will be converted into the following plan:

```
MetadataAggRel(metadataLevel=FILE (or another non-ROW_GROUP value), createNewAggregations=false)
DrillDirectScanRel
```

When it is converted into the physical plan, two-phase aggregation may be used for the case when incoming row
count is greater than `planner.slice_target` option value. In this case, the lowest aggregation will be hash
aggregation and it will be executed on the same minor fragments where the scan is produced. `Sort` operator will be
placed above hash aggregation. `HashToMergeExchange` operator above `Sort` will send aggregated sorted data to the
stream aggregate above.

Example of the resulting plan:

```
MetadataControllerPrel
...
MetadataStreamAggPrel(PHASE_1of1)
SortPrel
MetadataHandlerPrel
MetadataStreamAggPrel(PHASE_2of2)
HashToMergeExchangePrel
SortPrel
MetadataHashAggPrel(PHASE_1of2)
ScanPrel
```

The lowest `MetadataStreamAggBatch` (or `MetadataHashAggBatch` for the case of two-phase aggregation with
`MetadataStreamAggBatch` above) creates required aggregate calls for every (or interesting only) table columns
and produces aggregations with grouping by segment columns that correspond to specific table level.
`MetadataHandlerBatch` above it populates batch with additional information about metadata type and other info.
`MetadataAggBatch` above merges metadata calculated before to obtain metadata for parent metadata levels and also stores incoming data to populate it to the Metastore later.
`MetadataStreamAggBatch` above merges metadata calculated before to obtain metadata for parent metadata levels and also stores incoming data to populate it to the Metastore later.

`MetadataControllerBatch` obtains all calculated metadata, converts it to the suitable form and sends it to the Metastore.

@@ -71,7 +71,7 @@ public RowsMatch matches(StatisticsProvider<C> evaluator) {
* @param stat statistics object
* @return <tt>true</tt> if the input stat object is null or has invalid statistics; false otherwise
*/
static boolean isNullOrEmpty(ColumnStatistics stat) {
public static boolean isNullOrEmpty(ColumnStatistics stat) {
return stat == null
|| !stat.contains(ColumnStatisticsKind.MIN_VALUE)
|| !stat.contains(ColumnStatisticsKind.MAX_VALUE)
@@ -213,9 +213,7 @@ private void addProtectedBlockHA(ClassGenerator<?> g, JBlock sub, String body, H
declareVarArgArray(g.getModel(), sub, inputVariables);
}
for (int i = 0; i < inputVariables.length; i++) {
ValueReference parameter = getAttributeParameter(i);
HoldingContainer inputVariable = inputVariables[i];
declare(sub, parameter, inputVariable.getHolder().type(), inputVariable.getHolder(), i);
declare(g.getModel(), sub, inputVariables[i], i);
}
}

@@ -23,6 +23,8 @@
import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.exec.expr.ClassGenerator;
import org.apache.drill.exec.expr.ClassGenerator.HoldingContainer;
import org.apache.drill.exec.physical.impl.aggregate.HashAggBatch;
import org.apache.drill.exec.physical.impl.aggregate.HashAggTemplate;
import org.apache.drill.exec.physical.impl.aggregate.StreamingAggBatch;
import org.apache.drill.exec.physical.impl.aggregate.StreamingAggTemplate;
import org.apache.drill.exec.record.VectorAccessibleComplexWriter;
@@ -56,10 +58,18 @@ public boolean isComplexWriterFuncHolder() {

@Override
public JVar[] renderStart(ClassGenerator<?> classGenerator, HoldingContainer[] inputVariables, FieldReference fieldReference) {
if (!classGenerator.getMappingSet().isHashAggMapping()) { //Declare workspace vars for non-hash-aggregation.
JInvocation container = classGenerator.getMappingSet().getOutgoing().invoke("getOutgoingContainer");
JInvocation container = classGenerator.getMappingSet().getOutgoing().invoke("getOutgoingContainer");

complexWriter = classGenerator.declareClassField("complexWriter", classGenerator.getModel()._ref(ComplexWriter.class));
complexWriter = classGenerator.declareClassField("complexWriter", classGenerator.getModel()._ref(ComplexWriter.class));

if (classGenerator.getMappingSet().isHashAggMapping()) {
// Default name is "col", if not passed in a reference name for the output vector.
String refName = fieldReference == null ? "col" : fieldReference.getRootSegment().getPath();
JClass cwClass = classGenerator.getModel().ref(VectorAccessibleComplexWriter.class);
classGenerator.getSetupBlock().assign(complexWriter, cwClass.staticInvoke("getWriter").arg(refName).arg(container));

return super.renderStart(classGenerator, inputVariables, fieldReference);
} else { //Declare workspace vars for non-hash-aggregation.
writerIdx = classGenerator.declareClassField("writerIdx", classGenerator.getModel()._ref(int.class));
lastWriterIdx = classGenerator.declareClassField("lastWriterIdx", classGenerator.getModel()._ref(int.class));
//Default name is "col", if not passed in a reference name for the output vector.
@@ -72,8 +82,6 @@ public JVar[] renderStart(ClassGenerator<?> classGenerator, HoldingContainer[] i
JVar[] workspaceJVars = declareWorkspaceVariables(classGenerator);
generateBody(classGenerator, ClassGenerator.BlockType.SETUP, setup(), null, workspaceJVars, true);
return workspaceJVars;
} else {
return super.renderStart(classGenerator, inputVariables, fieldReference);
}
}

@@ -84,26 +92,33 @@ public void renderMiddle(ClassGenerator<?> classGenerator, HoldingContainer[] in
getRegisteredNames()[0]));

JBlock sub = new JBlock(true, true);
JBlock topSub = sub;
JClass aggBatchClass = null;

if (classGenerator.getCodeGenerator().getDefinition() == StreamingAggTemplate.TEMPLATE_DEFINITION) {
aggBatchClass = classGenerator.getModel().ref(StreamingAggBatch.class);
} else if (classGenerator.getCodeGenerator().getDefinition() == HashAggTemplate.TEMPLATE_DEFINITION) {
aggBatchClass = classGenerator.getModel().ref(HashAggBatch.class);
}
assert aggBatchClass != null : "ComplexWriterAggFuncHolder should only be used with Streaming Aggregate Operator";

JExpression aggBatch = JExpr.cast(aggBatchClass, classGenerator.getMappingSet().getOutgoing());

classGenerator.getSetupBlock().add(aggBatch.invoke("addComplexWriter").arg(complexWriter));
// Only set the writer if there is a position change. Calling setPosition may cause underlying writers to allocate
// new vectors, thereby, losing the previously stored values
JBlock condAssignCW = classGenerator.getEvalBlock()._if(lastWriterIdx.ne(writerIdx))._then();
condAssignCW.add(complexWriter.invoke("setPosition").arg(writerIdx));
condAssignCW.assign(lastWriterIdx, writerIdx);
if (classGenerator.getMappingSet().isHashAggMapping()) {
classGenerator.getEvalBlock().add(
complexWriter
.invoke("setPosition")
.arg(classGenerator.getMappingSet().getWorkspaceIndex()));
} else {
JBlock condAssignCW = classGenerator.getEvalBlock()._if(lastWriterIdx.ne(writerIdx))._then();
condAssignCW.add(complexWriter.invoke("setPosition").arg(writerIdx));
condAssignCW.assign(lastWriterIdx, writerIdx);
}
sub.decl(classGenerator.getModel()._ref(ComplexWriter.class), getReturnValue().getName(), complexWriter);

// add the subblock after the out declaration.
classGenerator.getEvalBlock().add(topSub);
classGenerator.getEvalBlock().add(sub);

addProtectedBlock(classGenerator, sub, add(), inputVariables, workspaceJVars, false);
classGenerator.getEvalBlock().directStatement(String.format("//---- end of eval portion of %s function. ----//",
@@ -124,7 +139,8 @@ public HoldingContainer renderEnd(ClassGenerator<?> classGenerator, HoldingConta
JExpr._new(classGenerator.getHolderType(getReturnType())));
}
classGenerator.getEvalBlock().add(sub);
if (getReturnType().getMinorType() == TypeProtos.MinorType.LATE) {
if (getReturnType().getMinorType() == TypeProtos.MinorType.LATE
&& !classGenerator.getMappingSet().isHashAggMapping()) {
sub.assignPlus(writerIdx, JExpr.lit(1));
}
addProtectedBlock(classGenerator, sub, output(), null, workspaceJVars, false);
@@ -218,43 +218,21 @@ protected void addProtectedBlock(ClassGenerator<?> g, JBlock sub, String body, H
if (decConstInputOnly && !inputVariables[i].isConstant()) {
continue;
}

ValueReference parameter = getAttributeParameter(i);
HoldingContainer inputVariable = inputVariables[i];
if (parameter.isFieldReader() && ! inputVariable.isReader()
&& ! Types.isComplex(inputVariable.getMajorType()) && inputVariable.getMinorType() != MinorType.UNION) {
JType singularReaderClass = g.getModel()._ref(TypeHelper.getHolderReaderImpl(inputVariable.getMajorType().getMinorType(),
inputVariable.getMajorType().getMode()));
JType fieldReadClass = getParamClass(g.getModel(), parameter, inputVariable.getHolder().type());
JInvocation reader = JExpr._new(singularReaderClass).arg(inputVariable.getHolder());
declare(sub, parameter, fieldReadClass, reader, i);
} else if (!parameter.isFieldReader() && inputVariable.isReader() && Types.isComplex(parameter.getType())) {
// For complex data-types (repeated maps/lists/dicts) the input to the aggregate will be a FieldReader. However, aggregate
// functions like ANY_VALUE, will assume the input to be a RepeatedMapHolder etc. Generate boilerplate code, to map
// from FieldReader to respective Holder.
if (Types.isComplex(parameter.getType())) {
JType holderClass = getParamClass(g.getModel(), parameter, inputVariable.getHolder().type());
JAssignmentTarget holderVar = declare(sub, parameter, holderClass, JExpr._new(holderClass), i);
sub.assign(holderVar.ref("reader"), inputVariable.getHolder());
}
} else {
JExpression exprToAssign = inputVariable.getHolder();
if (parameter.isVarArg() && parameter.isFieldReader() && Types.isUnion(inputVariable.getMajorType())) {
exprToAssign = exprToAssign.ref("reader");
}
declare(sub, parameter, inputVariable.getHolder().type(), exprToAssign, i);
}
declare(g.getModel(), sub, inputVariables[i], i);
}
}

JVar[] internalVars = new JVar[workspaceJVars.length];
for (int i = 0; i < workspaceJVars.length; i++) {
if (decConstInputOnly) {
internalVars[i] = sub.decl(g.getModel()._ref(attributes.getWorkspaceVars()[i].getType()), attributes.getWorkspaceVars()[i].getName(), workspaceJVars[i]);
internalVars[i] = sub.decl(
g.getModel()._ref(attributes.getWorkspaceVars()[i].getType()),
attributes.getWorkspaceVars()[i].getName(), workspaceJVars[i]);
} else {
internalVars[i] = sub.decl(g.getModel()._ref(attributes.getWorkspaceVars()[i].getType()), attributes.getWorkspaceVars()[i].getName(), workspaceJVars[i]);
internalVars[i] = sub.decl(
g.getModel()._ref(attributes.getWorkspaceVars()[i].getType()),
attributes.getWorkspaceVars()[i].getName(), workspaceJVars[i]);
}

}

Preconditions.checkNotNull(body);
@@ -266,6 +244,48 @@ protected void addProtectedBlock(ClassGenerator<?> g, JBlock sub, String body, H
}
}

/**
* Declares attribute parameter which corresponds to specified {@code currentIndex}
* in specified {@code jBlock} considering its type.
*
* @param model code model to generate the code
* @param jBlock block of code to be populated
* @param inputVariable input variable for current function
* @param currentIndex index of current parameter
*/
protected void declare(JCodeModel model, JBlock jBlock,
HoldingContainer inputVariable, int currentIndex) {
ValueReference parameter = getAttributeParameter(currentIndex);
if (parameter.isFieldReader()
&& !inputVariable.isReader()
&& !Types.isComplex(inputVariable.getMajorType())
&& inputVariable.getMinorType() != MinorType.UNION) {
JType singularReaderClass = model._ref(
TypeHelper.getHolderReaderImpl(inputVariable.getMajorType().getMinorType(),
inputVariable.getMajorType().getMode()));
JType fieldReadClass = getParamClass(model, parameter, inputVariable.getHolder().type());
JInvocation reader = JExpr._new(singularReaderClass).arg(inputVariable.getHolder());
declare(jBlock, parameter, fieldReadClass, reader, currentIndex);
} else if (!parameter.isFieldReader()
&& inputVariable.isReader()
&& Types.isComplex(parameter.getType())) {
// For complex data-types (repeated maps/lists/dicts) the input to the aggregate will be a FieldReader. However, aggregate
// functions like ANY_VALUE, will assume the input to be a RepeatedMapHolder etc. Generate boilerplate code, to map
// from FieldReader to respective Holder.
if (Types.isComplex(parameter.getType())) {
JType holderClass = getParamClass(model, parameter, inputVariable.getHolder().type());
JAssignmentTarget holderVar = declare(jBlock, parameter, holderClass, JExpr._new(holderClass), currentIndex);
jBlock.assign(holderVar.ref("reader"), inputVariable.getHolder());
}
} else {
JExpression exprToAssign = inputVariable.getHolder();
if (parameter.isVarArg() && parameter.isFieldReader() && Types.isUnion(inputVariable.getMajorType())) {
exprToAssign = exprToAssign.ref("reader");
}
declare(jBlock, parameter, inputVariable.getHolder().type(), exprToAssign, currentIndex);
}
}

/**
* Declares array for storing vararg function arguments.
*
@@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.metastore;

import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.server.options.OptionManager;

import java.util.StringJoiner;

/**
* Holds system / session options that are used for obtaining partition / implicit / special column names.
*/
public class ColumnNamesOptions {
private final String fullyQualifiedName;
private final String partitionColumnNameLabel;
private final String rowGroupIndex;
private final String rowGroupStart;
private final String rowGroupLength;
private final String lastModifiedTime;

public ColumnNamesOptions(OptionManager optionManager) {
this.fullyQualifiedName = optionManager.getOption(ExecConstants.IMPLICIT_FQN_COLUMN_LABEL).string_val;
this.partitionColumnNameLabel = optionManager.getOption(ExecConstants.FILESYSTEM_PARTITION_COLUMN_LABEL).string_val;
this.rowGroupIndex = optionManager.getOption(ExecConstants.IMPLICIT_ROW_GROUP_INDEX_COLUMN_LABEL).string_val;
this.rowGroupStart = optionManager.getOption(ExecConstants.IMPLICIT_ROW_GROUP_START_COLUMN_LABEL).string_val;
this.rowGroupLength = optionManager.getOption(ExecConstants.IMPLICIT_ROW_GROUP_LENGTH_COLUMN_LABEL).string_val;
this.lastModifiedTime = optionManager.getOption(ExecConstants.IMPLICIT_LAST_MODIFIED_TIME_COLUMN_LABEL).string_val;
}

public String partitionColumnNameLabel() {
return partitionColumnNameLabel;
}

public String fullyQualifiedName() {
return fullyQualifiedName;
}

public String rowGroupIndex() {
return rowGroupIndex;
}

public String rowGroupStart() {
return rowGroupStart;
}

public String rowGroupLength() {
return rowGroupLength;
}

public String lastModifiedTime() {
return lastModifiedTime;
}

@Override
public String toString() {
return new StringJoiner(", ", ColumnNamesOptions.class.getSimpleName() + "[", "]")
.add("fullyQualifiedName='" + fullyQualifiedName + "'")
.add("partitionColumnNameLabel='" + partitionColumnNameLabel + "'")
.add("rowGroupIndex='" + rowGroupIndex + "'")
.add("rowGroupStart='" + rowGroupStart + "'")
.add("rowGroupLength='" + rowGroupLength + "'")
.add("lastModifiedTime='" + lastModifiedTime + "'")
.toString();
}
}

0 comments on commit 20293b6

Please sign in to comment.