Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HIVE-26572: Support constant expressions in vectorization #3637

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ STAGE PLANS:
Execution mode: vectorized
Reduce Operator Tree:
Group By Operator
aggregations: min(VALUE._col0), max(VALUE._col1), bloom_filter(VALUE._col2, expectedEntries=20)
aggregations: min(VALUE._col0), max(VALUE._col1), bloom_filter(VALUE._col2, 1, expectedEntries=20)
mode: final
outputColumnNames: _col0, _col1, _col2
Statistics: Num rows: 1 Data size: 552 Basic stats: COMPLETE Column stats: COMPLETE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@
package org.apache.hadoop.hive.ql.exec.vector;

import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.hive.common.type.DataTypePhysicalVariation;
import org.apache.hadoop.hive.ql.exec.vector.ColumnVector.Type;
import org.apache.hadoop.hive.ql.exec.vector.expressions.ConstantVectorExpression;
import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression;
import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression;
import org.apache.hadoop.hive.ql.metadata.HiveException;
Expand Down Expand Up @@ -98,6 +100,8 @@ public class VectorAggregationDesc implements java.io.Serializable {

private final Class<? extends VectorAggregateExpression> vecAggrClass;

private List<ConstantVectorExpression> constants;

private GenericUDAFEvaluator evaluator;
private GenericUDAFEvaluator.Mode udafEvaluatorMode;

Expand Down Expand Up @@ -126,6 +130,18 @@ public VectorAggregationDesc(String aggregationName, GenericUDAFEvaluator evalua
this.vecAggrClass = vecAggrClass;
}

public VectorAggregationDesc(String aggregationName, GenericUDAFEvaluator evaluator,
asolimando marked this conversation as resolved.
Show resolved Hide resolved
GenericUDAFEvaluator.Mode udafEvaluatorMode,
TypeInfo inputTypeInfo, ColumnVector.Type inputColVectorType,
VectorExpression inputExpression, TypeInfo outputTypeInfo,
ColumnVector.Type outputColVectorType,
Class<? extends VectorAggregateExpression> vecAggrClass,
List<ConstantVectorExpression> constants) {
this(aggregationName, evaluator, udafEvaluatorMode, inputTypeInfo, inputColVectorType, inputExpression,
outputTypeInfo, outputColVectorType, vecAggrClass);
this.constants = constants;
}

public String getAggregationName() {
return aggregationName;
}
Expand Down Expand Up @@ -166,6 +182,10 @@ public Class<? extends VectorAggregateExpression> getVecAggrClass() {
return vecAggrClass;
}

public List<ConstantVectorExpression> getConstants() {
return constants;
}

@Override
public String toString() {
StringBuilder sb = new StringBuilder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,14 @@
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Queue;

import com.google.common.collect.ImmutableMap;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.common.type.DataTypePhysicalVariation;
Expand All @@ -48,7 +50,6 @@
import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriter;
import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriterFactory;
import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression;
import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorUDAFBloomFilterMerge;
import org.apache.hadoop.hive.ql.exec.vector.wrapper.VectorHashKeyWrapperBase;
import org.apache.hadoop.hive.ql.exec.vector.wrapper.VectorHashKeyWrapperBatch;
import org.apache.hadoop.hive.ql.exec.vector.wrapper.VectorHashKeyWrapperGeneral;
Expand Down Expand Up @@ -85,6 +86,19 @@ public class VectorGroupByOperator extends Operator<GroupByDesc>
private static final Logger LOG = LoggerFactory.getLogger(
VectorGroupByOperator.class.getName());

private static final Map<String, Class<?>> PRIMITIVE_TYPE_NAME_TO_CLASS = new ImmutableMap.Builder<String, Class<?>>()
asolimando marked this conversation as resolved.
Show resolved Hide resolved
.put("int", Integer.TYPE)
.put("long", Long.TYPE)
.put("double", Double.TYPE)
.put("float", Float.TYPE)
.put("bool", Boolean.TYPE)
.put("char", Character.TYPE)
.put("byte", Byte.TYPE)
.put("void", Void.TYPE)
.put("short", Short.TYPE)
.put("string", String.class)
.build();

private VectorizationContext vContext;
private VectorGroupByDesc vectorDesc;

Expand Down Expand Up @@ -1128,13 +1142,11 @@ protected void initializeOp(Configuration hconf) throws HiveException {
isLlap = LlapProxy.isDaemon();
VectorExpression.doTransientInit(keyExpressions, hconf);

List<ObjectInspector> objectInspectors = new ArrayList<ObjectInspector>();
List<ObjectInspector> objectInspectors = new ArrayList<>();

List<ExprNodeDesc> keysDesc = conf.getKeys();
try {

List<String> outputFieldNames = conf.getOutputColumnNames();
final int outputCount = outputFieldNames.size();

for(int i = 0; i < outputKeyLength; ++i) {
VectorExpressionWriter vew = VectorExpressionWriterFactory.
Expand All @@ -1147,11 +1159,7 @@ protected void initializeOp(Configuration hconf) throws HiveException {
aggregators = new VectorAggregateExpression[aggregateCount];
for (int i = 0; i < aggregateCount; ++i) {
VectorAggregationDesc vecAggrDesc = vecAggrDescs[i];

Class<? extends VectorAggregateExpression> vecAggrClass = vecAggrDesc.getVecAggrClass();

VectorAggregateExpression vecAggrExpr =
instantiateExpression(vecAggrDesc, hconf);
VectorAggregateExpression vecAggrExpr = instantiateExpression(vecAggrDesc);
VectorExpression.doTransientInit(vecAggrExpr.getInputExpression(), hconf);
aggregators[i] = vecAggrExpr;

Expand Down Expand Up @@ -1216,38 +1224,67 @@ protected void initializeOp(Configuration hconf) throws HiveException {
}

@VisibleForTesting
VectorAggregateExpression instantiateExpression(VectorAggregationDesc vecAggrDesc,
Configuration hconf) throws HiveException {
Class<? extends VectorAggregateExpression> vecAggrClass = vecAggrDesc.getVecAggrClass();
VectorAggregateExpression instantiateExpression(VectorAggregationDesc vecAggrDesc) throws HiveException {
final Class<? extends VectorAggregateExpression> vecAggrClass = vecAggrDesc.getVecAggrClass();
final Constructor<? extends VectorAggregateExpression> ctor;

final List<ConstantVectorExpression> constants =
vecAggrDesc.getConstants() == null ? Collections.emptyList() : vecAggrDesc.getConstants();

final Class<?>[] ctorParamClasses = new Class<?>[constants.size() + 1];
ctorParamClasses[0] = VectorAggregationDesc.class;

final List<Object> values = new ArrayList<>(constants.size() + 1);
values.add(vecAggrDesc);

for (int i = 0; i < constants.size(); ++i) {
ConstantVectorExpression constant = constants.get(i);
String typeName = constant.getOutputTypeInfo().getTypeName();
if (!PRIMITIVE_TYPE_NAME_TO_CLASS.containsKey(typeName)) {
throw new IllegalArgumentException(
"Non-primitive type detected as " + i + "-th argument for a call to the vectorized aggregation class "
+ vecAggrClass.getSimpleName() + ", only primitive types are supported");
}
ctorParamClasses[i + 1] = PRIMITIVE_TYPE_NAME_TO_CLASS.get(typeName);

// this is needed to bring back to the right type the value, e.g. int-family always gets back a long,
// but in this way the constructor parameters won't match anymore, so we need to convert here
switch (typeName) {
asolimando marked this conversation as resolved.
Show resolved Hide resolved
case "byte":
values.add(constant.getBytesValue());
break;
case "float":
values.add(new Double(constant.getDoubleValue()).floatValue());
break;
case "double":
values.add(constant.getDoubleValue());
break;
case "int":
values.add(new Long(constant.getLongValue()).intValue());
break;
case "long":
values.add(constant.getLongValue());
break;
case "short":
values.add(new Long(constant.getLongValue()).shortValue());
break;
default:
values.add(constant.getValue());
}
}

Constructor<? extends VectorAggregateExpression> ctor = null;
try {
if (vecAggrDesc.getVecAggrClass() == VectorUDAFBloomFilterMerge.class) {
// VectorUDAFBloomFilterMerge is instantiated with a number of threads of parallel processing
ctor = vecAggrClass.getConstructor(VectorAggregationDesc.class, int.class);
} else {
ctor = vecAggrClass.getConstructor(VectorAggregationDesc.class);
}
ctor = vecAggrClass.getConstructor(ctorParamClasses);
} catch (Exception e) {
throw new HiveException(
"Constructor " + vecAggrClass.getSimpleName() + "(VectorAggregationDesc) not available", e);
"Constructor " + vecAggrClass.getSimpleName() + "(" + Arrays.toString(ctorParamClasses) + ") not available", e);
}
VectorAggregateExpression vecAggrExpr = null;
try {
if (vecAggrDesc.getVecAggrClass() == VectorUDAFBloomFilterMerge.class) {
vecAggrExpr = ctor.newInstance(vecAggrDesc,
hconf.getInt(HiveConf.ConfVars.TEZ_BLOOM_FILTER_MERGE_THREADS.varname,
HiveConf.ConfVars.TEZ_BLOOM_FILTER_MERGE_THREADS.defaultIntVal));
} else {
vecAggrExpr = ctor.newInstance(vecAggrDesc);
}
return ctor.newInstance(values.toArray(new Object[0]));
} catch (Exception e) {

throw new HiveException(
"Failed to create " + vecAggrClass.getSimpleName() + "(VectorAggregationDesc) object ",
e);
"Failed to create " + vecAggrClass.getSimpleName() + "(" + Arrays.toString(ctorParamClasses) + ") object ", e);
}
return vecAggrExpr;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.VectorAggregationBufferRow;
Expand All @@ -46,7 +47,7 @@ public class VectorUDAFBloomFilterMerge extends VectorAggregateExpression {

private long expectedEntries = -1;
private transient int aggBufferSize;
private transient int numThreads;
private transient int numThreads = HiveConf.ConfVars.TEZ_BLOOM_FILTER_MERGE_THREADS.defaultIntVal;

/**
* class for storing the current aggregate value.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFIn;
import org.apache.hadoop.hive.ql.util.NullOrdering;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -636,6 +637,8 @@ private boolean generateSemiJoinOperatorPlan(DynamicListContext ctx, ParseContex
AggregationDesc max = new AggregationDesc("max",
FunctionRegistry.getGenericUDAFEvaluator("max", aggFnOIs, false, false),
params, false, Mode.PARTIAL1);
// we don't add numThreads here since PARTIAL1 mode is for VectorUDAFBloomFilter which does
// not support numThreads parameter
AggregationDesc bloomFilter = new AggregationDesc("bloom_filter",
FunctionRegistry.getGenericUDAFEvaluator("bloom_filter", aggFnOIs, false, false),
params, false, Mode.PARTIAL1);
Expand Down Expand Up @@ -737,6 +740,10 @@ private boolean generateSemiJoinOperatorPlan(DynamicListContext ctx, ParseContex
rsValueCols.get(2).getTypeInfo(),
Utilities.ReduceField.VALUE + "." +
gbOutputNames.get(2), "", false));
int numThreads = parseContext.getConf().getInt(HiveConf.ConfVars.TEZ_BLOOM_FILTER_MERGE_THREADS.varname,
asolimando marked this conversation as resolved.
Show resolved Hide resolved
HiveConf.ConfVars.TEZ_BLOOM_FILTER_MERGE_THREADS.defaultIntVal);
PrimitiveTypeInfo intTypeInfo = TypeInfoFactory.getPrimitiveTypeInfo("int");
asolimando marked this conversation as resolved.
Show resolved Hide resolved
bloomFilterFinalParams.add(new ExprNodeConstantDesc(intTypeInfo, numThreads));

AggregationDesc min = new AggregationDesc("min",
FunctionRegistry.getGenericUDAFEvaluator("min", minFinalFnOIs,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFBetween;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFInBloomFilter;
import org.apache.hadoop.hive.ql.util.NullOrdering;
import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;

Expand Down Expand Up @@ -451,8 +452,22 @@ private static AggregationDesc bloomFilterAggregation(GenericUDAFEvaluator.Mode
bloomFilterEval.setMinEntries(conf.getLongVar(HiveConf.ConfVars.TEZ_MIN_BLOOM_FILTER_ENTRIES));
bloomFilterEval.setFactor(conf.getFloatVar(HiveConf.ConfVars.TEZ_BLOOM_FILTER_FACTOR));
bloomFilterEval.setHintEntries(numEntriesHint);
List<ExprNodeDesc> p = Collections.singletonList(col);
AggregationDesc bloom = new AggregationDesc("bloom_filter", bloomFilterEval, p, false, mode);

List<ExprNodeDesc> params;

// numThreads is available only for VectorUDAFBloomFilterMerge, which only supports
// these two modes, don't add numThreads otherwise
if (GenericUDAFEvaluator.Mode.PARTIAL2.equals(mode)
asolimando marked this conversation as resolved.
Show resolved Hide resolved
|| GenericUDAFEvaluator.Mode.FINAL.equals(mode)) {
int numThreads = conf.getInt(HiveConf.ConfVars.TEZ_BLOOM_FILTER_MERGE_THREADS.varname,
asolimando marked this conversation as resolved.
Show resolved Hide resolved
HiveConf.ConfVars.TEZ_BLOOM_FILTER_MERGE_THREADS.defaultIntVal);
PrimitiveTypeInfo intTypeInfo = TypeInfoFactory.getPrimitiveTypeInfo("int");
asolimando marked this conversation as resolved.
Show resolved Hide resolved
params = Arrays.asList(col, new ExprNodeConstantDesc(intTypeInfo, numThreads));
} else {
params = Collections.singletonList(col);
}

AggregationDesc bloom = new AggregationDesc("bloom_filter", bloomFilterEval, params, false, mode);
deniskuzZ marked this conversation as resolved.
Show resolved Hide resolved
// It is necessary to set the bloom filter evaluator otherwise there are runtime failures see HIVE-24018
bloom.setGenericUDAFWritableEvaluator(bloomFilterEval);
return bloom;
Expand Down