Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ private AggregationDesc buildAggregationDesc(
AggregationDesc agg = new AggregationDesc();
ObjectInspector oi = TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo(typeInfo);
GenericUDAFEvaluator genericUDAFEvaluator = FunctionRegistry.getGenericUDAFEvaluator(aggregate,
ImmutableList.of(oi).asList(), false, false);
ImmutableList.of(oi).asList(), false, false, false);
agg.setGenericUDAFEvaluator(genericUDAFEvaluator);
if (aggregate.equals("bloom_filter")) {
GenericUDAFBloomFilter.GenericUDAFBloomFilterEvaluator udafBloomFilterEvaluator =
Expand Down
10 changes: 5 additions & 5 deletions ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java
Original file line number Diff line number Diff line change
Expand Up @@ -1241,7 +1241,7 @@ public static TypeInfo getCommonClassForStruct(StructTypeInfo a, StructTypeInfo
@SuppressWarnings("deprecation")
public static GenericUDAFEvaluator getGenericUDAFEvaluator(String name,
List<ObjectInspector> argumentOIs, boolean isDistinct,
boolean isAllColumns) throws SemanticException {
boolean isAllColumns, boolean isMapAggr) throws SemanticException {

GenericUDAFResolver udafResolver = getGenericUDAFResolver(name);
if (udafResolver == null) {
Expand All @@ -1257,7 +1257,7 @@ public static GenericUDAFEvaluator getGenericUDAFEvaluator(String name,

GenericUDAFParameterInfo paramInfo =
new SimpleGenericUDAFParameterInfo(
args, false, isDistinct, isAllColumns);
args, false, isDistinct, isAllColumns, true, isMapAggr);

GenericUDAFEvaluator udafEvaluator;
if (udafResolver instanceof GenericUDAFResolver2) {
Expand All @@ -1271,12 +1271,12 @@ public static GenericUDAFEvaluator getGenericUDAFEvaluator(String name,

public static GenericUDAFEvaluator getGenericWindowingEvaluator(String name,
List<ObjectInspector> argumentOIs, boolean isDistinct,
boolean isAllColumns, boolean respectNulls) throws SemanticException {
boolean isAllColumns, boolean respectNulls, boolean isMapAggr) throws SemanticException {
Registry registry = SessionState.getRegistry();
GenericUDAFEvaluator evaluator = registry == null ? null :
registry.getGenericWindowingEvaluator(name, argumentOIs, isDistinct, isAllColumns, respectNulls);
registry.getGenericWindowingEvaluator(name, argumentOIs, isDistinct, isAllColumns, respectNulls, isMapAggr);
return evaluator != null ? evaluator :
system.getGenericWindowingEvaluator(name, argumentOIs, isDistinct, isAllColumns, respectNulls);
system.getGenericWindowingEvaluator(name, argumentOIs, isDistinct, isAllColumns, respectNulls, isMapAggr);
}

public static GenericUDAFResolver getGenericUDAFResolver(String functionName)
Expand Down
10 changes: 5 additions & 5 deletions ql/src/java/org/apache/hadoop/hive/ql/exec/Registry.java
Original file line number Diff line number Diff line change
Expand Up @@ -477,7 +477,7 @@ public void getFunctionSynonyms(
@SuppressWarnings("deprecation")
public GenericUDAFEvaluator getGenericUDAFEvaluator(String name,
List<ObjectInspector> argumentOIs, boolean isWindowing, boolean isDistinct,
boolean isAllColumns, boolean respectNulls) throws SemanticException {
boolean isAllColumns, boolean respectNulls, boolean isMapAggr) throws SemanticException {

GenericUDAFResolver udafResolver = getGenericUDAFResolver(name);
if (udafResolver == null) {
Expand All @@ -494,7 +494,7 @@ public GenericUDAFEvaluator getGenericUDAFEvaluator(String name,

GenericUDAFParameterInfo paramInfo =
new SimpleGenericUDAFParameterInfo(
args, isWindowing, isDistinct, isAllColumns, respectNulls);
args, isWindowing, isDistinct, isAllColumns, respectNulls, isMapAggr);
if (udafResolver instanceof GenericUDAFResolver2) {
udafEvaluator =
((GenericUDAFResolver2) udafResolver).getEvaluator(paramInfo);
Expand All @@ -505,7 +505,7 @@ public GenericUDAFEvaluator getGenericUDAFEvaluator(String name,
}

public GenericUDAFEvaluator getGenericWindowingEvaluator(String functionName,
List<ObjectInspector> argumentOIs, boolean isDistinct, boolean isAllColumns, boolean respectNulls)
List<ObjectInspector> argumentOIs, boolean isDistinct, boolean isAllColumns, boolean respectNulls, boolean isMapAggr)
throws SemanticException {
functionName = functionName.toLowerCase();
WindowFunctionInfo info = getWindowFunctionInfo(functionName);
Expand All @@ -514,14 +514,14 @@ public GenericUDAFEvaluator getGenericWindowingEvaluator(String functionName,
}
if (!functionName.equals(FunctionRegistry.LEAD_FUNC_NAME) &&
!functionName.equals(FunctionRegistry.LAG_FUNC_NAME)) {
return getGenericUDAFEvaluator(functionName, argumentOIs, true, isDistinct, isAllColumns, respectNulls);
return getGenericUDAFEvaluator(functionName, argumentOIs, true, isDistinct, isAllColumns, respectNulls, isMapAggr);
}

// this must be lead/lag UDAF
ObjectInspector args[] = new ObjectInspector[argumentOIs.size()];
GenericUDAFResolver udafResolver = info.getGenericUDAFResolver();
GenericUDAFParameterInfo paramInfo = new SimpleGenericUDAFParameterInfo(
argumentOIs.toArray(args), true, isDistinct, isAllColumns);
argumentOIs.toArray(args), true, isDistinct, isAllColumns, true, isMapAggr);
return ((GenericUDAFResolver2) udafResolver).getEvaluator(paramInfo);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -375,8 +375,9 @@ private GroupByOperator genMapGroupby3(GroupByOperator mGby2,
aggParameters.add(new ExprNodeColumnDesc(paraExprInfo.getType(), paraExpression,
paraExprInfo.getTabAlias(), paraExprInfo.getIsVirtualCol()));
Mode amode = SemanticAnalyzer.groupByDescModeToUDAFMode(GroupByDesc.Mode.HASH, false);
Boolean isMapAggr = HiveConf.getBoolVar(pGraphContext.getConf(), HiveConf.ConfVars.HIVEMAPSIDEAGGREGATE);
GenericUDAFEvaluator genericUDAFEvaluator = SemanticAnalyzer.getGenericUDAFEvaluator(
"count", aggParameters, null, false, false);
"count", aggParameters, null, false, false, isMapAggr);
assert (genericUDAFEvaluator != null);
GenericUDAFInfo udaf = SemanticAnalyzer.getGenericUDAFInfo(genericUDAFEvaluator, amode,
aggParameters);
Expand Down Expand Up @@ -467,8 +468,9 @@ private GroupByOperator genReduceGroupby(ReduceSinkOperator rs2,
ArrayList<ExprNodeDesc> aggParameters = new ArrayList<ExprNodeDesc>();
aggParameters.add(new ExprNodeColumnDesc(paraExprInfo.getType(), paraExpression, paraExprInfo
.getTabAlias(), paraExprInfo.getIsVirtualCol()));
Boolean isMapAggr = HiveConf.getBoolVar(pGraphContext.getConf(), HiveConf.ConfVars.HIVEMAPSIDEAGGREGATE);
GenericUDAFEvaluator genericUDAFEvaluator = SemanticAnalyzer.getGenericUDAFEvaluator("count",
aggParameters, null, false, false);
aggParameters, null, false, false, isMapAggr);
assert (genericUDAFEvaluator != null);
Mode amode = SemanticAnalyzer.groupByDescModeToUDAFMode(GroupByDesc.Mode.MERGEPARTIAL, false);
GenericUDAFInfo udaf = SemanticAnalyzer.getGenericUDAFInfo(genericUDAFEvaluator, amode,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -633,16 +633,17 @@ private boolean generateSemiJoinOperatorPlan(DynamicListContext ctx, ParseContex

ArrayList<AggregationDesc> aggs = new ArrayList<AggregationDesc>();
try {
Boolean isMapAggr = HiveConf.getBoolVar(parseContext.getConf(), HiveConf.ConfVars.HIVEMAPSIDEAGGREGATE);
AggregationDesc min = new AggregationDesc("min",
FunctionRegistry.getGenericUDAFEvaluator("min", aggFnOIs, false, false),
FunctionRegistry.getGenericUDAFEvaluator("min", aggFnOIs, false, false, isMapAggr),
params, false, Mode.PARTIAL1);
AggregationDesc max = new AggregationDesc("max",
FunctionRegistry.getGenericUDAFEvaluator("max", aggFnOIs, false, false),
FunctionRegistry.getGenericUDAFEvaluator("max", aggFnOIs, false, false, isMapAggr),
params, false, Mode.PARTIAL1);
// we don't add numThreads here since PARTIAL1 mode is for VectorUDAFBloomFilter which does
// not support numThreads parameter
AggregationDesc bloomFilter = new AggregationDesc(BLOOM_FILTER_FUNCTION,
FunctionRegistry.getGenericUDAFEvaluator(BLOOM_FILTER_FUNCTION, aggFnOIs, false, false),
FunctionRegistry.getGenericUDAFEvaluator(BLOOM_FILTER_FUNCTION, aggFnOIs, false, false, isMapAggr),
params, false, Mode.PARTIAL1);
GenericUDAFBloomFilterEvaluator bloomFilterEval =
(GenericUDAFBloomFilterEvaluator) bloomFilter.getGenericUDAFEvaluator();
Expand Down Expand Up @@ -746,17 +747,18 @@ private boolean generateSemiJoinOperatorPlan(DynamicListContext ctx, ParseContex
TypeInfo intTypeInfo = TypeInfoFactory.getPrimitiveTypeInfoFromJavaPrimitive(Integer.TYPE);
bloomFilterFinalParams.add(new ExprNodeConstantDesc(intTypeInfo, numThreads));

Boolean isMapAggr = HiveConf.getBoolVar(parseContext.getConf(), HiveConf.ConfVars.HIVEMAPSIDEAGGREGATE);
AggregationDesc min = new AggregationDesc("min",
FunctionRegistry.getGenericUDAFEvaluator("min", minFinalFnOIs,
false, false),
false, false, isMapAggr),
minFinalParams, false, Mode.FINAL);
AggregationDesc max = new AggregationDesc("max",
FunctionRegistry.getGenericUDAFEvaluator("max", maxFinalFnOIs,
false, false),
false, false, isMapAggr),
maxFinalParams, false, Mode.FINAL);
AggregationDesc bloomFilter = new AggregationDesc(BLOOM_FILTER_FUNCTION,
FunctionRegistry.getGenericUDAFEvaluator(BLOOM_FILTER_FUNCTION, bloomFilterFinalFnOIs,
false, false),
false, false, isMapAggr),
bloomFilterFinalParams, false, Mode.FINAL);
GenericUDAFBloomFilterEvaluator bloomFilterEval = (GenericUDAFBloomFilterEvaluator) bloomFilter.getGenericUDAFEvaluator();
bloomFilterEval.setSourceOperator(selectOp);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -274,11 +274,13 @@ private static GBInfo getGBInfo(HiveAggregate aggRel, OpAttr inputOpAf, HiveConf
gbInfo.distColIndices.add(distColIndicesOfUDAF);
}

Boolean isMapAggr = HiveConf.getBoolVar(hc, HiveConf.ConfVars.HIVEMAPSIDEAGGREGATE);
// special handling for count, similar to PlanModifierForASTConv::replaceEmptyGroupAggr()
udafAttrs.udafEvaluator = SemanticAnalyzer.getGenericUDAFEvaluator(udafAttrs.udafName,
new ArrayList<ExprNodeDesc>(udafAttrs.udafParams), new ASTNode(),
udafAttrs.isDistinctUDAF, udafAttrs.udafParams.size() == 0 &&
"count".equalsIgnoreCase(udafAttrs.udafName) ? true : false);
"count".equalsIgnoreCase(udafAttrs.udafName) ? true : false,
isMapAggr);
gbInfo.udafAttrs.add(udafAttrs);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3698,8 +3698,9 @@ private AggregateInfo getHiveAggInfo(ASTNode aggAst, int aggFnLstArgIndx, RowRes
boolean isAllColumns = aggAst.getType() == HiveParser.TOK_FUNCTIONSTAR;
String aggName = unescapeIdentifier(aggAst.getChild(0).getText());

Boolean isMapAggr = HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVEMAPSIDEAGGREGATE);
AggregateInfo aInfo = functionHelper.getWindowAggregateFunctionInfo(
isDistinct, isAllColumns, aggName, aggParameters);
isDistinct, isAllColumns, isMapAggr, aggName, aggParameters);

// If that did not work, try GenericUDF translation
if (aInfo == null) {
Expand Down Expand Up @@ -3854,8 +3855,9 @@ private RelNode genGBLogicalPlan(QB qb, RelNode srcRel) throws SemanticException
aggParameters.add(parameterExpr);
}

Boolean isMapAggr = HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVEMAPSIDEAGGREGATE);
AggregateInfo aInfo = functionHelper.getAggregateFunctionInfo(
isDistinct, isAllColumns, aggName, aggParameters, fieldCollations);
isDistinct, isAllColumns, isMapAggr, aggName, aggParameters, fieldCollations);
aggregations.add(aInfo);
String field = getColumnInternalName(groupingColsSize + aggregations.size() - 1);
outputColumnNames.add(field);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -346,10 +346,12 @@ private WindowFunctionDef translate(WindowTableFunctionDef wdwTFnDef,
if (wFnInfo == null) {
throw new SemanticException(ErrorMsg.INVALID_FUNCTION.getMsg(spec.getName()));
}
Boolean isMapAggr = HiveConf.getBoolVar(hCfg, HiveConf.ConfVars.HIVEMAPSIDEAGGREGATE);
WindowFunctionDef def = new WindowFunctionDef();
def.setName(spec.getName());
def.setAlias(spec.getAlias());
def.setDistinct(spec.isDistinct());
def.setMapAggr(isMapAggr);
def.setExpressionTreeString(spec.getExpression().toStringTree());
def.setStar(spec.isStar());
def.setPivotResult(wFnInfo.isPivotResult());
Expand Down Expand Up @@ -583,7 +585,7 @@ static void setupWdwFnEvaluator(WindowFunctionDef def) throws HiveException {

GenericUDAFEvaluator wFnEval = FunctionRegistry.getGenericWindowingEvaluator(def.getName(),
argOIs,
def.isDistinct(), def.isStar(), def.respectNulls());
def.isDistinct(), def.isStar(), def.respectNulls(), def.isMapAggr());
ObjectInspector OI = wFnEval.init(GenericUDAFEvaluator.Mode.COMPLETE, funcArgOIs);
def.setWFnEval(wFnEval);
def.setOI(OI);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4985,18 +4985,18 @@ static List<ObjectInspector> getWritableObjectInspector(List<ExprNodeDesc> exprs
*/
public static GenericUDAFEvaluator getGenericUDAFEvaluator(String aggName,
List<ExprNodeDesc> aggParameters, ASTNode aggTree,
boolean isDistinct, boolean isAllColumns)
boolean isDistinct, boolean isAllColumns, boolean isMapAggr)
throws SemanticException {
return getGenericUDAFEvaluator2(aggName, getWritableObjectInspector(aggParameters),
aggTree, isDistinct, isAllColumns);
aggTree, isDistinct, isAllColumns, isMapAggr);
}

public static GenericUDAFEvaluator getGenericUDAFEvaluator2(String aggName,
List<ObjectInspector> aggParameterOIs, ASTNode aggTree,
boolean isDistinct, boolean isAllColumns)
boolean isDistinct, boolean isAllColumns, boolean isMapAggr)
throws SemanticException {
GenericUDAFEvaluator result = FunctionRegistry.getGenericUDAFEvaluator(
aggName, aggParameterOIs, isDistinct, isAllColumns);
aggName, aggParameterOIs, isDistinct, isAllColumns, isMapAggr);
if (null == result) {
String reason = "Looking for UDAF Evaluator\"" + aggName
+ "\" with parameters " + aggParameterOIs;
Expand Down Expand Up @@ -5224,9 +5224,10 @@ private Operator genGroupByPlanGroupByOperator(QBParseInfo parseInfo,
if (isDistinct) {
numDistinctUDFs++;
}
Boolean isMapAggr = HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVEMAPSIDEAGGREGATE);
Mode amode = groupByDescModeToUDAFMode(mode, isDistinct);
GenericUDAFEvaluator genericUDAFEvaluator = getGenericUDAFEvaluator(
aggName, aggParameters, value, isDistinct, isAllColumns);
aggName, aggParameters, value, isDistinct, isAllColumns, isMapAggr);
assert (genericUDAFEvaluator != null);
GenericUDAFInfo udaf = getGenericUDAFInfo(genericUDAFEvaluator, amode, aggParameters);
aggregations.add(new AggregationDesc(aggName.toLowerCase(),
Expand Down Expand Up @@ -5672,8 +5673,9 @@ private Operator genGroupByPlanMapGroupByOperator(QB qb,
boolean isAllColumns = value.getType() == HiveParser.TOK_FUNCTIONSTAR;
Mode amode = groupByDescModeToUDAFMode(GroupByDesc.Mode.HASH, isDistinct);

Boolean isMapAggr = HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVEMAPSIDEAGGREGATE);
GenericUDAFEvaluator genericUDAFEvaluator = getGenericUDAFEvaluator(
aggName, aggParameters, value, isDistinct, isAllColumns);
aggName, aggParameters, value, isDistinct, isAllColumns, isMapAggr);
assert (genericUDAFEvaluator != null);
GenericUDAFInfo udaf = getGenericUDAFInfo(genericUDAFEvaluator, amode,
aggParameters);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,14 @@ RexNode getExpression(String functionText, FunctionInfo functionInfo,
/**
* Returns aggregation information based on given parameters.
*/
AggregateInfo getAggregateFunctionInfo(boolean isDistinct, boolean isAllColumns,
AggregateInfo getAggregateFunctionInfo(boolean isDistinct, boolean isAllColumns, boolean isMapAggr,
String aggregateName, List<RexNode> aggregateParameters, List<FieldCollation> fieldCollations)
throws SemanticException;

/**
* Returns aggregation information for analytical function based on given parameters.
*/
AggregateInfo getWindowAggregateFunctionInfo(boolean isDistinct, boolean isAllColumns,
AggregateInfo getWindowAggregateFunctionInfo(boolean isDistinct, boolean isAllColumns, boolean isMapAggr,
String aggregateName, List<RexNode> aggregateParameters)
throws SemanticException;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,7 @@ public Void visitCall(final RexCall call) {
* {@inheritDoc}
*/
@Override
public AggregateInfo getAggregateFunctionInfo(boolean isDistinct, boolean isAllColumns,
public AggregateInfo getAggregateFunctionInfo(boolean isDistinct, boolean isAllColumns, boolean isMapAggr,
String aggregateName, List<RexNode> aggregateParameters,
List<FieldCollation> fieldCollations)
throws SemanticException {
Expand All @@ -387,7 +387,7 @@ public AggregateInfo getAggregateFunctionInfo(boolean isDistinct, boolean isAllC
}
}
GenericUDAFEvaluator genericUDAFEvaluator = SemanticAnalyzer.getGenericUDAFEvaluator2(
aggregateName, aggParameterOIs, null, isDistinct, isAllColumns);
aggregateName, aggParameterOIs, null, isDistinct, isAllColumns, isMapAggr);
assert (genericUDAFEvaluator != null);
GenericUDAFInfo udaf = SemanticAnalyzer.getGenericUDAFInfo2(
genericUDAFEvaluator, udafMode, aggParameterOIs);
Expand All @@ -398,7 +398,7 @@ public AggregateInfo getAggregateFunctionInfo(boolean isDistinct, boolean isAllC
* {@inheritDoc}
*/
@Override
public AggregateInfo getWindowAggregateFunctionInfo(boolean isDistinct, boolean isAllColumns,
public AggregateInfo getWindowAggregateFunctionInfo(boolean isDistinct, boolean isAllColumns, boolean isMapAggr,
String aggregateName, List<RexNode> aggregateParameters)
throws SemanticException {
TypeInfo returnType = null;
Expand All @@ -422,13 +422,13 @@ public AggregateInfo getWindowAggregateFunctionInfo(boolean isDistinct, boolean
if (aggregateName.toLowerCase().equals(FunctionRegistry.LEAD_FUNC_NAME)
|| aggregateName.toLowerCase().equals(FunctionRegistry.LAG_FUNC_NAME)) {
GenericUDAFEvaluator genericUDAFEvaluator = FunctionRegistry.getGenericWindowingEvaluator(aggregateName,
aggParameterOIs, isDistinct, isAllColumns, true);
aggParameterOIs, isDistinct, isAllColumns, true, isMapAggr);
GenericUDAFInfo udaf = SemanticAnalyzer.getGenericUDAFInfo2(
genericUDAFEvaluator, udafMode, aggParameterOIs);
returnType = ((ListTypeInfo) udaf.returnType).getListElementTypeInfo();
} else {
GenericUDAFEvaluator genericUDAFEvaluator = SemanticAnalyzer.getGenericUDAFEvaluator2(
aggregateName, aggParameterOIs, null, isDistinct, isAllColumns);
aggregateName, aggParameterOIs, null, isDistinct, isAllColumns, isMapAggr);
assert (genericUDAFEvaluator != null);
GenericUDAFInfo udaf = SemanticAnalyzer.getGenericUDAFInfo2(
genericUDAFEvaluator, udafMode, aggParameterOIs);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ public class WindowFunctionDef extends WindowExpressionDef {
String name;
boolean isStar;
boolean isDistinct;
boolean isMapAggr;
List<PTFExpressionDef> args;
WindowFrameDef windowFrame;
GenericUDAFEvaluator wFnEval;
Expand Down Expand Up @@ -62,6 +63,14 @@ public void setDistinct(boolean isDistinct) {
this.isDistinct = isDistinct;
}

public void setMapAggr(boolean mapAggr) {
isMapAggr = mapAggr;
}

public boolean isMapAggr() {
return isMapAggr;
}

public List<PTFExpressionDef> getArgs() {
return args;
}
Expand Down
Loading