From ed1af84c24d7f6e80de504ea13baee1ad2cf7c9a Mon Sep 17 00:00:00 2001 From: uncleGen Date: Wed, 8 Feb 2023 13:54:37 +0800 Subject: [PATCH 1/4] fix wrong objectinspector when enable map-side aggregration --- .../hadoop/hive/ql/exec/FunctionRegistry.java | 8 ++++---- .../org/apache/hadoop/hive/ql/exec/Registry.java | 8 ++++---- .../ql/optimizer/CountDistinctRewriteProc.java | 6 ++++-- .../DynamicPartitionPruningOptimization.java | 14 ++++++++------ .../translator/opconventer/HiveGBOpConvUtil.java | 4 +++- .../hadoop/hive/ql/parse/CalcitePlanner.java | 6 ++++-- .../hadoop/hive/ql/parse/SemanticAnalyzer.java | 14 ++++++++------ .../hadoop/hive/ql/parse/type/FunctionHelper.java | 4 ++-- .../hive/ql/parse/type/HiveFunctionHelper.java | 10 +++++----- .../udf/generic/AbstractGenericUDAFResolver.java | 8 +++++++- .../ql/udf/generic/GenericUDAFCollectList.java | 2 +- .../hive/ql/udf/generic/GenericUDAFCollectSet.java | 2 +- .../generic/GenericUDAFMkCollectionEvaluator.java | 11 +++++++---- .../ql/udf/generic/GenericUDAFParameterInfo.java | 2 ++ .../generic/SimpleGenericUDAFParameterInfo.java | 11 +++++++++-- .../apache/hadoop/hive/ql/exec/TestOperators.java | 2 +- .../hive/ql/optimizer/physical/TestVectorizer.java | 2 +- 17 files changed, 71 insertions(+), 43 deletions(-) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java index cb5aa5b96783..40dbde2ede12 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java @@ -1241,7 +1241,7 @@ public static TypeInfo getCommonClassForStruct(StructTypeInfo a, StructTypeInfo @SuppressWarnings("deprecation") public static GenericUDAFEvaluator getGenericUDAFEvaluator(String name, List argumentOIs, boolean isDistinct, - boolean isAllColumns) throws SemanticException { + boolean isAllColumns, boolean isMapAggr) throws SemanticException { GenericUDAFResolver udafResolver = getGenericUDAFResolver(name); if (udafResolver == null) { @@ -1257,7 +1257,7 @@ public static GenericUDAFEvaluator getGenericUDAFEvaluator(String name, GenericUDAFParameterInfo paramInfo = new SimpleGenericUDAFParameterInfo( - args, false, isDistinct, isAllColumns); + args, false, isDistinct, isAllColumns, true, isMapAggr); GenericUDAFEvaluator udafEvaluator; if (udafResolver instanceof GenericUDAFResolver2) { @@ -1274,9 +1274,9 @@ public static GenericUDAFEvaluator getGenericWindowingEvaluator(String name, boolean isAllColumns, boolean respectNulls) throws SemanticException { Registry registry = SessionState.getRegistry(); GenericUDAFEvaluator evaluator = registry == null ? null : - registry.getGenericWindowingEvaluator(name, argumentOIs, isDistinct, isAllColumns, respectNulls); + registry.getGenericWindowingEvaluator(name, argumentOIs, isDistinct, isAllColumns, respectNulls, false); return evaluator != null ? evaluator : - system.getGenericWindowingEvaluator(name, argumentOIs, isDistinct, isAllColumns, respectNulls); + system.getGenericWindowingEvaluator(name, argumentOIs, isDistinct, isAllColumns, respectNulls, false); } public static GenericUDAFResolver getGenericUDAFResolver(String functionName) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Registry.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Registry.java index d70cccd297a3..a2723bc58f6d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Registry.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Registry.java @@ -477,7 +477,7 @@ public void getFunctionSynonyms( @SuppressWarnings("deprecation") public GenericUDAFEvaluator getGenericUDAFEvaluator(String name, List argumentOIs, boolean isWindowing, boolean isDistinct, - boolean isAllColumns, boolean respectNulls) throws SemanticException { + boolean isAllColumns, boolean respectNulls, boolean isMapAggr) throws SemanticException { GenericUDAFResolver udafResolver = getGenericUDAFResolver(name); if (udafResolver == null) { @@ -494,7 +494,7 @@ public GenericUDAFEvaluator getGenericUDAFEvaluator(String name, GenericUDAFParameterInfo paramInfo = new SimpleGenericUDAFParameterInfo( - args, isWindowing, isDistinct, isAllColumns, respectNulls); + args, isWindowing, isDistinct, isAllColumns, respectNulls, isMapAggr); if (udafResolver instanceof GenericUDAFResolver2) { udafEvaluator = ((GenericUDAFResolver2) udafResolver).getEvaluator(paramInfo); @@ -505,7 +505,7 @@ public GenericUDAFEvaluator getGenericUDAFEvaluator(String name, } public GenericUDAFEvaluator getGenericWindowingEvaluator(String functionName, - List argumentOIs, boolean isDistinct, boolean isAllColumns, boolean respectNulls) + List argumentOIs, boolean isDistinct, boolean isAllColumns, boolean respectNulls, boolean isMapAggr) throws SemanticException { functionName = functionName.toLowerCase(); WindowFunctionInfo info = getWindowFunctionInfo(functionName); @@ -514,7 +514,7 @@ public GenericUDAFEvaluator getGenericWindowingEvaluator(String functionName, } if (!functionName.equals(FunctionRegistry.LEAD_FUNC_NAME) && !functionName.equals(FunctionRegistry.LAG_FUNC_NAME)) { - return getGenericUDAFEvaluator(functionName, argumentOIs, true, isDistinct, isAllColumns, respectNulls); + return getGenericUDAFEvaluator(functionName, argumentOIs, true, isDistinct, isAllColumns, respectNulls, isMapAggr); } // this must be lead/lag UDAF diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/CountDistinctRewriteProc.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/CountDistinctRewriteProc.java index 32edacba7c3e..a4661941814d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/CountDistinctRewriteProc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/CountDistinctRewriteProc.java @@ -375,8 +375,9 @@ private GroupByOperator genMapGroupby3(GroupByOperator mGby2, aggParameters.add(new ExprNodeColumnDesc(paraExprInfo.getType(), paraExpression, paraExprInfo.getTabAlias(), paraExprInfo.getIsVirtualCol())); Mode amode = SemanticAnalyzer.groupByDescModeToUDAFMode(GroupByDesc.Mode.HASH, false); + Boolean isMapAggr = HiveConf.getBoolVar(pGraphContext.getConf(), HiveConf.ConfVars.HIVEMAPSIDEAGGREGATE); GenericUDAFEvaluator genericUDAFEvaluator = SemanticAnalyzer.getGenericUDAFEvaluator( - "count", aggParameters, null, false, false); + "count", aggParameters, null, false, false, isMapAggr); assert (genericUDAFEvaluator != null); GenericUDAFInfo udaf = SemanticAnalyzer.getGenericUDAFInfo(genericUDAFEvaluator, amode, aggParameters); @@ -467,8 +468,9 @@ private GroupByOperator genReduceGroupby(ReduceSinkOperator rs2, ArrayList aggParameters = new ArrayList(); aggParameters.add(new ExprNodeColumnDesc(paraExprInfo.getType(), paraExpression, paraExprInfo .getTabAlias(), paraExprInfo.getIsVirtualCol())); + Boolean isMapAggr = HiveConf.getBoolVar(pGraphContext.getConf(), HiveConf.ConfVars.HIVEMAPSIDEAGGREGATE); GenericUDAFEvaluator genericUDAFEvaluator = SemanticAnalyzer.getGenericUDAFEvaluator("count", - aggParameters, null, false, false); + aggParameters, null, false, false, isMapAggr); assert (genericUDAFEvaluator != null); Mode amode = SemanticAnalyzer.groupByDescModeToUDAFMode(GroupByDesc.Mode.MERGEPARTIAL, false); GenericUDAFInfo udaf = SemanticAnalyzer.getGenericUDAFInfo(genericUDAFEvaluator, amode, diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/DynamicPartitionPruningOptimization.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/DynamicPartitionPruningOptimization.java index d19c61d4c189..95b4f0375612 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/DynamicPartitionPruningOptimization.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/DynamicPartitionPruningOptimization.java @@ -633,16 +633,17 @@ private boolean generateSemiJoinOperatorPlan(DynamicListContext ctx, ParseContex ArrayList aggs = new ArrayList(); try { + Boolean isMapAggr = HiveConf.getBoolVar(parseContext.getConf(), HiveConf.ConfVars.HIVEMAPSIDEAGGREGATE); AggregationDesc min = new AggregationDesc("min", - FunctionRegistry.getGenericUDAFEvaluator("min", aggFnOIs, false, false), + FunctionRegistry.getGenericUDAFEvaluator("min", aggFnOIs, false, false, isMapAggr), params, false, Mode.PARTIAL1); AggregationDesc max = new AggregationDesc("max", - FunctionRegistry.getGenericUDAFEvaluator("max", aggFnOIs, false, false), + FunctionRegistry.getGenericUDAFEvaluator("max", aggFnOIs, false, false, isMapAggr), params, false, Mode.PARTIAL1); // we don't add numThreads here since PARTIAL1 mode is for VectorUDAFBloomFilter which does // not support numThreads parameter AggregationDesc bloomFilter = new AggregationDesc(BLOOM_FILTER_FUNCTION, - FunctionRegistry.getGenericUDAFEvaluator(BLOOM_FILTER_FUNCTION, aggFnOIs, false, false), + FunctionRegistry.getGenericUDAFEvaluator(BLOOM_FILTER_FUNCTION, aggFnOIs, false, false, isMapAggr), params, false, Mode.PARTIAL1); GenericUDAFBloomFilterEvaluator bloomFilterEval = (GenericUDAFBloomFilterEvaluator) bloomFilter.getGenericUDAFEvaluator(); @@ -746,17 +747,18 @@ private boolean generateSemiJoinOperatorPlan(DynamicListContext ctx, ParseContex TypeInfo intTypeInfo = TypeInfoFactory.getPrimitiveTypeInfoFromJavaPrimitive(Integer.TYPE); bloomFilterFinalParams.add(new ExprNodeConstantDesc(intTypeInfo, numThreads)); + Boolean isMapAggr = HiveConf.getBoolVar(parseContext.getConf(), HiveConf.ConfVars.HIVEMAPSIDEAGGREGATE); AggregationDesc min = new AggregationDesc("min", FunctionRegistry.getGenericUDAFEvaluator("min", minFinalFnOIs, - false, false), + false, false, isMapAggr), minFinalParams, false, Mode.FINAL); AggregationDesc max = new AggregationDesc("max", FunctionRegistry.getGenericUDAFEvaluator("max", maxFinalFnOIs, - false, false), + false, false, isMapAggr), maxFinalParams, false, Mode.FINAL); AggregationDesc bloomFilter = new AggregationDesc(BLOOM_FILTER_FUNCTION, FunctionRegistry.getGenericUDAFEvaluator(BLOOM_FILTER_FUNCTION, bloomFilterFinalFnOIs, - false, false), + false, false, isMapAggr), bloomFilterFinalParams, false, Mode.FINAL); GenericUDAFBloomFilterEvaluator bloomFilterEval = (GenericUDAFBloomFilterEvaluator) bloomFilter.getGenericUDAFEvaluator(); bloomFilterEval.setSourceOperator(selectOp); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/opconventer/HiveGBOpConvUtil.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/opconventer/HiveGBOpConvUtil.java index 34d4ee380c65..a23b222f739e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/opconventer/HiveGBOpConvUtil.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/opconventer/HiveGBOpConvUtil.java @@ -274,11 +274,13 @@ private static GBInfo getGBInfo(HiveAggregate aggRel, OpAttr inputOpAf, HiveConf gbInfo.distColIndices.add(distColIndicesOfUDAF); } + Boolean isMapAggr = HiveConf.getBoolVar(hc, HiveConf.ConfVars.HIVEMAPSIDEAGGREGATE); // special handling for count, similar to PlanModifierForASTConv::replaceEmptyGroupAggr() udafAttrs.udafEvaluator = SemanticAnalyzer.getGenericUDAFEvaluator(udafAttrs.udafName, new ArrayList(udafAttrs.udafParams), new ASTNode(), udafAttrs.isDistinctUDAF, udafAttrs.udafParams.size() == 0 && - "count".equalsIgnoreCase(udafAttrs.udafName) ? true : false); + "count".equalsIgnoreCase(udafAttrs.udafName) ? true : false, + isMapAggr); gbInfo.udafAttrs.add(udafAttrs); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java index 7f274cadcbf3..bb668d0d7079 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java @@ -3698,8 +3698,9 @@ private AggregateInfo getHiveAggInfo(ASTNode aggAst, int aggFnLstArgIndx, RowRes boolean isAllColumns = aggAst.getType() == HiveParser.TOK_FUNCTIONSTAR; String aggName = unescapeIdentifier(aggAst.getChild(0).getText()); + Boolean isMapAggr = HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVEMAPSIDEAGGREGATE); AggregateInfo aInfo = functionHelper.getWindowAggregateFunctionInfo( - isDistinct, isAllColumns, aggName, aggParameters); + isDistinct, isAllColumns, isMapAggr, aggName, aggParameters); // If that did not work, try GenericUDF translation if (aInfo == null) { @@ -3854,8 +3855,9 @@ private RelNode genGBLogicalPlan(QB qb, RelNode srcRel) throws SemanticException aggParameters.add(parameterExpr); } + Boolean isMapAggr = HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVEMAPSIDEAGGREGATE); AggregateInfo aInfo = functionHelper.getAggregateFunctionInfo( - isDistinct, isAllColumns, aggName, aggParameters, fieldCollations); + isDistinct, isAllColumns, isMapAggr, aggName, aggParameters, fieldCollations); aggregations.add(aInfo); String field = getColumnInternalName(groupingColsSize + aggregations.size() - 1); outputColumnNames.add(field); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index c709209d9d88..cab726f1a3ee 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -4985,18 +4985,18 @@ static List getWritableObjectInspector(List exprs */ public static GenericUDAFEvaluator getGenericUDAFEvaluator(String aggName, List aggParameters, ASTNode aggTree, - boolean isDistinct, boolean isAllColumns) + boolean isDistinct, boolean isAllColumns, boolean isMapAggr) throws SemanticException { return getGenericUDAFEvaluator2(aggName, getWritableObjectInspector(aggParameters), - aggTree, isDistinct, isAllColumns); + aggTree, isDistinct, isAllColumns, isMapAggr); } public static GenericUDAFEvaluator getGenericUDAFEvaluator2(String aggName, List aggParameterOIs, ASTNode aggTree, - boolean isDistinct, boolean isAllColumns) + boolean isDistinct, boolean isAllColumns, boolean isMapAggr) throws SemanticException { GenericUDAFEvaluator result = FunctionRegistry.getGenericUDAFEvaluator( - aggName, aggParameterOIs, isDistinct, isAllColumns); + aggName, aggParameterOIs, isDistinct, isAllColumns, isMapAggr); if (null == result) { String reason = "Looking for UDAF Evaluator\"" + aggName + "\" with parameters " + aggParameterOIs; @@ -5224,9 +5224,10 @@ private Operator genGroupByPlanGroupByOperator(QBParseInfo parseInfo, if (isDistinct) { numDistinctUDFs++; } + Boolean isMapAggr = HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVEMAPSIDEAGGREGATE); Mode amode = groupByDescModeToUDAFMode(mode, isDistinct); GenericUDAFEvaluator genericUDAFEvaluator = getGenericUDAFEvaluator( - aggName, aggParameters, value, isDistinct, isAllColumns); + aggName, aggParameters, value, isDistinct, isAllColumns, isMapAggr); assert (genericUDAFEvaluator != null); GenericUDAFInfo udaf = getGenericUDAFInfo(genericUDAFEvaluator, amode, aggParameters); aggregations.add(new AggregationDesc(aggName.toLowerCase(), @@ -5672,8 +5673,9 @@ private Operator genGroupByPlanMapGroupByOperator(QB qb, boolean isAllColumns = value.getType() == HiveParser.TOK_FUNCTIONSTAR; Mode amode = groupByDescModeToUDAFMode(GroupByDesc.Mode.HASH, isDistinct); + Boolean isMapAggr = HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVEMAPSIDEAGGREGATE); GenericUDAFEvaluator genericUDAFEvaluator = getGenericUDAFEvaluator( - aggName, aggParameters, value, isDistinct, isAllColumns); + aggName, aggParameters, value, isDistinct, isAllColumns, isMapAggr); assert (genericUDAFEvaluator != null); GenericUDAFInfo udaf = getGenericUDAFInfo(genericUDAFEvaluator, amode, aggParameters); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/type/FunctionHelper.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/type/FunctionHelper.java index c74c2aad93b1..fa483576f66c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/type/FunctionHelper.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/type/FunctionHelper.java @@ -67,14 +67,14 @@ RexNode getExpression(String functionText, FunctionInfo functionInfo, /** * Returns aggregation information based on given parameters. */ - AggregateInfo getAggregateFunctionInfo(boolean isDistinct, boolean isAllColumns, + AggregateInfo getAggregateFunctionInfo(boolean isDistinct, boolean isAllColumns, boolean isMapAggr, String aggregateName, List aggregateParameters, List fieldCollations) throws SemanticException; /** * Returns aggregation information for analytical function based on given parameters. */ - AggregateInfo getWindowAggregateFunctionInfo(boolean isDistinct, boolean isAllColumns, + AggregateInfo getWindowAggregateFunctionInfo(boolean isDistinct, boolean isAllColumns, boolean isMapAggr, String aggregateName, List aggregateParameters) throws SemanticException; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/type/HiveFunctionHelper.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/type/HiveFunctionHelper.java index 69327390bfe6..fa9b701b7c0b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/type/HiveFunctionHelper.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/type/HiveFunctionHelper.java @@ -367,7 +367,7 @@ public Void visitCall(final RexCall call) { * {@inheritDoc} */ @Override - public AggregateInfo getAggregateFunctionInfo(boolean isDistinct, boolean isAllColumns, + public AggregateInfo getAggregateFunctionInfo(boolean isDistinct, boolean isAllColumns, boolean isMapAggr, String aggregateName, List aggregateParameters, List fieldCollations) throws SemanticException { @@ -387,7 +387,7 @@ public AggregateInfo getAggregateFunctionInfo(boolean isDistinct, boolean isAllC } } GenericUDAFEvaluator genericUDAFEvaluator = SemanticAnalyzer.getGenericUDAFEvaluator2( - aggregateName, aggParameterOIs, null, isDistinct, isAllColumns); + aggregateName, aggParameterOIs, null, isDistinct, isAllColumns, isMapAggr); assert (genericUDAFEvaluator != null); GenericUDAFInfo udaf = SemanticAnalyzer.getGenericUDAFInfo2( genericUDAFEvaluator, udafMode, aggParameterOIs); @@ -398,7 +398,7 @@ public AggregateInfo getAggregateFunctionInfo(boolean isDistinct, boolean isAllC * {@inheritDoc} */ @Override - public AggregateInfo getWindowAggregateFunctionInfo(boolean isDistinct, boolean isAllColumns, + public AggregateInfo getWindowAggregateFunctionInfo(boolean isDistinct, boolean isAllColumns, boolean isMapAggr, String aggregateName, List aggregateParameters) throws SemanticException { TypeInfo returnType = null; @@ -422,13 +422,13 @@ public AggregateInfo getWindowAggregateFunctionInfo(boolean isDistinct, boolean if (aggregateName.toLowerCase().equals(FunctionRegistry.LEAD_FUNC_NAME) || aggregateName.toLowerCase().equals(FunctionRegistry.LAG_FUNC_NAME)) { GenericUDAFEvaluator genericUDAFEvaluator = FunctionRegistry.getGenericWindowingEvaluator(aggregateName, - aggParameterOIs, isDistinct, isAllColumns, true); + aggParameterOIs, isDistinct, isAllColumns, true, isMapAggr); GenericUDAFInfo udaf = SemanticAnalyzer.getGenericUDAFInfo2( genericUDAFEvaluator, udafMode, aggParameterOIs); returnType = ((ListTypeInfo) udaf.returnType).getListElementTypeInfo(); } else { GenericUDAFEvaluator genericUDAFEvaluator = SemanticAnalyzer.getGenericUDAFEvaluator2( - aggregateName, aggParameterOIs, null, isDistinct, isAllColumns); + aggregateName, aggParameterOIs, null, isDistinct, isAllColumns, isMapAggr); assert (genericUDAFEvaluator != null); GenericUDAFInfo udaf = SemanticAnalyzer.getGenericUDAFInfo2( genericUDAFEvaluator, udafMode, aggParameterOIs); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/AbstractGenericUDAFResolver.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/AbstractGenericUDAFResolver.java index d82e6e47ac30..a2e1fdad6923 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/AbstractGenericUDAFResolver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/AbstractGenericUDAFResolver.java @@ -35,6 +35,8 @@ public abstract class AbstractGenericUDAFResolver implements GenericUDAFResolver2 { + private boolean isMapAggr = false; + @SuppressWarnings("deprecation") @Override public GenericUDAFEvaluator getEvaluator(GenericUDAFParameterInfo info) @@ -44,7 +46,7 @@ public GenericUDAFEvaluator getEvaluator(GenericUDAFParameterInfo info) throw new SemanticException( "The specified syntax for UDAF invocation is invalid."); } - + this.isMapAggr = info.isMapAggr(); return getEvaluator(info.getParameters()); } @@ -54,4 +56,8 @@ public GenericUDAFEvaluator getEvaluator(TypeInfo[] info) throw new SemanticException( "This UDAF does not support the deprecated getEvaluator() method."); } + + public boolean isMapAggr() { + return this.isMapAggr; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFCollectList.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFCollectList.java index 8dc93f331063..e4400a477c82 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFCollectList.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFCollectList.java @@ -49,6 +49,6 @@ public GenericUDAFEvaluator getEvaluator(TypeInfo[] parameters) "Only primitive, struct, list or map type arguments are accepted but " + parameters[0].getTypeName() + " was passed as parameter 1."); } - return new GenericUDAFMkCollectionEvaluator(BufferType.LIST); + return new GenericUDAFMkCollectionEvaluator(BufferType.LIST, isMapAggr()); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFCollectSet.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFCollectSet.java index 6cd0c1c3941f..7e2c6a380e6c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFCollectSet.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFCollectSet.java @@ -50,7 +50,7 @@ public GenericUDAFEvaluator getEvaluator(TypeInfo[] parameters) "Only primitive, struct, list or map type arguments are accepted but " + parameters[0].getTypeName() + " was passed as parameter 1."); } - return new GenericUDAFMkCollectionEvaluator(BufferType.SET); + return new GenericUDAFMkCollectionEvaluator(BufferType.SET, isMapAggr()); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFMkCollectionEvaluator.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFMkCollectionEvaluator.java index cffc7f765100..467c2cae00f4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFMkCollectionEvaluator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFMkCollectionEvaluator.java @@ -47,13 +47,15 @@ enum BufferType { SET, LIST } private transient ListObjectInspector internalMergeOI; private BufferType bufferType; + private boolean isMapAggr; //needed by kyro public GenericUDAFMkCollectionEvaluator() { } - public GenericUDAFMkCollectionEvaluator(BufferType bufferType){ + public GenericUDAFMkCollectionEvaluator(BufferType bufferType, boolean isMapAggr) { this.bufferType = bufferType; + this.isMapAggr = isMapAggr; } @Override @@ -67,10 +69,11 @@ public ObjectInspector init(Mode m, ObjectInspector[] parameters) return ObjectInspectorFactory.getStandardListObjectInspector( ObjectInspectorUtils.getStandardObjectInspector(inputOI)); } else { - if (!(parameters[0] instanceof ListObjectInspector)) { + if (!isMapAggr) { //no map aggregation. - inputOI = ObjectInspectorUtils.getStandardObjectInspector(parameters[0]); - return ObjectInspectorFactory.getStandardListObjectInspector(inputOI); + inputOI = parameters[0]; + return ObjectInspectorFactory.getStandardListObjectInspector( + ObjectInspectorUtils.getStandardObjectInspector(inputOI)); } else { internalMergeOI = (ListObjectInspector) parameters[0]; inputOI = internalMergeOI.getListElementObjectInspector(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFParameterInfo.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFParameterInfo.java index 1afa1ebf007a..fb5b5ede5fb1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFParameterInfo.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFParameterInfo.java @@ -89,4 +89,6 @@ public interface GenericUDAFParameterInfo { boolean isAllColumns(); boolean respectNulls(); + + boolean isMapAggr(); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/SimpleGenericUDAFParameterInfo.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/SimpleGenericUDAFParameterInfo.java index b0c75affa969..deb7e364ec35 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/SimpleGenericUDAFParameterInfo.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/SimpleGenericUDAFParameterInfo.java @@ -33,19 +33,21 @@ public class SimpleGenericUDAFParameterInfo implements GenericUDAFParameterInfo private final boolean distinct; private final boolean allColumns; private final boolean respectNulls; + private final boolean isMapAggr; public SimpleGenericUDAFParameterInfo(ObjectInspector[] params, boolean isWindowing, boolean distinct, boolean allColumns) { - this(params, isWindowing, distinct, allColumns, true); + this(params, isWindowing, distinct, allColumns, true, false); } public SimpleGenericUDAFParameterInfo(ObjectInspector[] params, boolean isWindowing, boolean distinct, - boolean allColumns, boolean respectNulls) { + boolean allColumns, boolean respectNulls, boolean isMapAggr) { this.parameters = params; this.isWindowing = isWindowing; this.distinct = distinct; this.allColumns = allColumns; this.respectNulls = respectNulls; + this.isMapAggr = isMapAggr; } @Override @@ -82,4 +84,9 @@ public boolean isWindowing() { public boolean respectNulls() { return respectNulls; } + + @Override + public boolean isMapAggr() { + return isMapAggr; + } } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java index c82fdf3a1d9a..db1c37786e9e 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java @@ -544,7 +544,7 @@ public void testLlapMemoryOversubscriptionMaxExecutorsPerQueryCalculation() { ArrayList params = new ArrayList(); params.add(inputColumn); GenericUDAFEvaluator genericUDAFEvaluator = - SemanticAnalyzer.getGenericUDAFEvaluator(aggregate, params, null, false, false); + SemanticAnalyzer.getGenericUDAFEvaluator(aggregate, params, null, false, false, false); AggregationDesc agg = new AggregationDesc(aggregate, genericUDAFEvaluator, params, false, GenericUDAFEvaluator.Mode.PARTIAL1); ArrayList aggs = new ArrayList(); diff --git a/ql/src/test/org/apache/hadoop/hive/ql/optimizer/physical/TestVectorizer.java b/ql/src/test/org/apache/hadoop/hive/ql/optimizer/physical/TestVectorizer.java index 5076a79d9d66..1cfdac460be8 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/optimizer/physical/TestVectorizer.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/optimizer/physical/TestVectorizer.java @@ -96,7 +96,7 @@ public void testAggregateOnUDF() throws HiveException, VectorizerCannotVectorize paramOIs.add(exprNodeDesc.getWritableObjectInspector()); AggregationDesc aggDesc = new AggregationDesc("sum", - FunctionRegistry.getGenericUDAFEvaluator("sum", paramOIs, false, false), + FunctionRegistry.getGenericUDAFEvaluator("sum", paramOIs, false, false, false), params, false, GenericUDAFEvaluator.Mode.PARTIAL1); From 519ff08559ffea32b26d6799f814eaee2419265c Mon Sep 17 00:00:00 2001 From: uncleGen Date: Wed, 8 Feb 2023 15:31:32 +0800 Subject: [PATCH 2/4] update --- .../org/apache/hadoop/hive/ql/exec/FunctionRegistry.java | 6 +++--- ql/src/java/org/apache/hadoop/hive/ql/exec/Registry.java | 2 +- .../org/apache/hadoop/hive/ql/parse/PTFTranslator.java | 4 +++- .../hadoop/hive/ql/plan/ptf/WindowFunctionDef.java | 9 +++++++++ 4 files changed, 16 insertions(+), 5 deletions(-) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java index 40dbde2ede12..ddb6696b29b0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java @@ -1271,12 +1271,12 @@ public static GenericUDAFEvaluator getGenericUDAFEvaluator(String name, public static GenericUDAFEvaluator getGenericWindowingEvaluator(String name, List argumentOIs, boolean isDistinct, - boolean isAllColumns, boolean respectNulls) throws SemanticException { + boolean isAllColumns, boolean respectNulls, boolean isMapAggr) throws SemanticException { Registry registry = SessionState.getRegistry(); GenericUDAFEvaluator evaluator = registry == null ? null : - registry.getGenericWindowingEvaluator(name, argumentOIs, isDistinct, isAllColumns, respectNulls, false); + registry.getGenericWindowingEvaluator(name, argumentOIs, isDistinct, isAllColumns, respectNulls, isMapAggr); return evaluator != null ? evaluator : - system.getGenericWindowingEvaluator(name, argumentOIs, isDistinct, isAllColumns, respectNulls, false); + system.getGenericWindowingEvaluator(name, argumentOIs, isDistinct, isAllColumns, respectNulls, isMapAggr); } public static GenericUDAFResolver getGenericUDAFResolver(String functionName) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Registry.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Registry.java index a2723bc58f6d..f95a9aabf092 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Registry.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Registry.java @@ -521,7 +521,7 @@ public GenericUDAFEvaluator getGenericWindowingEvaluator(String functionName, ObjectInspector args[] = new ObjectInspector[argumentOIs.size()]; GenericUDAFResolver udafResolver = info.getGenericUDAFResolver(); GenericUDAFParameterInfo paramInfo = new SimpleGenericUDAFParameterInfo( - argumentOIs.toArray(args), true, isDistinct, isAllColumns); + argumentOIs.toArray(args), true, isDistinct, isAllColumns, true, isMapAggr); return ((GenericUDAFResolver2) udafResolver).getEvaluator(paramInfo); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/PTFTranslator.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/PTFTranslator.java index d69be80cd150..ad74564c4083 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/PTFTranslator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/PTFTranslator.java @@ -346,10 +346,12 @@ private WindowFunctionDef translate(WindowTableFunctionDef wdwTFnDef, if (wFnInfo == null) { throw new SemanticException(ErrorMsg.INVALID_FUNCTION.getMsg(spec.getName())); } + Boolean isMapAggr = HiveConf.getBoolVar(hCfg, HiveConf.ConfVars.HIVEMAPSIDEAGGREGATE); WindowFunctionDef def = new WindowFunctionDef(); def.setName(spec.getName()); def.setAlias(spec.getAlias()); def.setDistinct(spec.isDistinct()); + def.setMapAggr(isMapAggr); def.setExpressionTreeString(spec.getExpression().toStringTree()); def.setStar(spec.isStar()); def.setPivotResult(wFnInfo.isPivotResult()); @@ -583,7 +585,7 @@ static void setupWdwFnEvaluator(WindowFunctionDef def) throws HiveException { GenericUDAFEvaluator wFnEval = FunctionRegistry.getGenericWindowingEvaluator(def.getName(), argOIs, - def.isDistinct(), def.isStar(), def.respectNulls()); + def.isDistinct(), def.isStar(), def.respectNulls(), def.isMapAggr()); ObjectInspector OI = wFnEval.init(GenericUDAFEvaluator.Mode.COMPLETE, funcArgOIs); def.setWFnEval(wFnEval); def.setOI(OI); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/ptf/WindowFunctionDef.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/ptf/WindowFunctionDef.java index a88e1eda5915..ba854685212b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/ptf/WindowFunctionDef.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/ptf/WindowFunctionDef.java @@ -29,6 +29,7 @@ public class WindowFunctionDef extends WindowExpressionDef { String name; boolean isStar; boolean isDistinct; + boolean isMapAggr; List args; WindowFrameDef windowFrame; GenericUDAFEvaluator wFnEval; @@ -62,6 +63,14 @@ public void setDistinct(boolean isDistinct) { this.isDistinct = isDistinct; } + public void setMapAggr(boolean mapAggr) { + isMapAggr = mapAggr; + } + + public boolean isMapAggr() { + return isMapAggr; + } + public List getArgs() { return args; } From 9037bc12c3d8f02271fae6c2779bf9d30ad95e3d Mon Sep 17 00:00:00 2001 From: uncleGen Date: Thu, 9 Feb 2023 20:34:48 +0800 Subject: [PATCH 3/4] fix ut --- .../vectorization/operators/VectorGroupByOperatorBench.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/itests/hive-jmh/src/main/java/org/apache/hive/benchmark/vectorization/operators/VectorGroupByOperatorBench.java b/itests/hive-jmh/src/main/java/org/apache/hive/benchmark/vectorization/operators/VectorGroupByOperatorBench.java index 1f87f8df1a82..ab50a1294ed9 100644 --- a/itests/hive-jmh/src/main/java/org/apache/hive/benchmark/vectorization/operators/VectorGroupByOperatorBench.java +++ b/itests/hive-jmh/src/main/java/org/apache/hive/benchmark/vectorization/operators/VectorGroupByOperatorBench.java @@ -185,7 +185,7 @@ private AggregationDesc buildAggregationDesc( AggregationDesc agg = new AggregationDesc(); ObjectInspector oi = TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo(typeInfo); GenericUDAFEvaluator genericUDAFEvaluator = FunctionRegistry.getGenericUDAFEvaluator(aggregate, - ImmutableList.of(oi).asList(), false, false); + ImmutableList.of(oi).asList(), false, false, false); agg.setGenericUDAFEvaluator(genericUDAFEvaluator); if (aggregate.equals("bloom_filter")) { GenericUDAFBloomFilter.GenericUDAFBloomFilterEvaluator udafBloomFilterEvaluator = From 41694d7bc6bf6c1f9d5deb1b868da17dbfb12817 Mon Sep 17 00:00:00 2001 From: uncleGen Date: Mon, 13 Feb 2023 19:13:01 +0800 Subject: [PATCH 4/4] add ut --- .../collect_list_disable_map_aggr.q | 10 +++++ .../collect_set_disable_map_aggr.q | 10 +++++ .../collect_list_disable_map_aggr.q.out | 44 +++++++++++++++++++ .../collect_set_disable_map_aggr.q.out | 44 +++++++++++++++++++ 4 files changed, 108 insertions(+) create mode 100644 ql/src/test/queries/clientpositive/collect_list_disable_map_aggr.q create mode 100644 ql/src/test/queries/clientpositive/collect_set_disable_map_aggr.q create mode 100644 ql/src/test/results/clientpositive/collect_list_disable_map_aggr.q.out create mode 100644 ql/src/test/results/clientpositive/collect_set_disable_map_aggr.q.out diff --git a/ql/src/test/queries/clientpositive/collect_list_disable_map_aggr.q b/ql/src/test/queries/clientpositive/collect_list_disable_map_aggr.q new file mode 100644 index 000000000000..7f36e0d24409 --- /dev/null +++ b/ql/src/test/queries/clientpositive/collect_list_disable_map_aggr.q @@ -0,0 +1,10 @@ +set hive.map.aggr=false; + +drop table tb1; + +create table tb1 (key int, startTime string, endTime string); + +insert into tb1 values (1, "100", "101"); +insert into tb1 values (2, "200", "201"); + +select key, collect_list(map("startTime",startTime,"endTime",endTime)) as col1 from tb1 group by key order by key; \ No newline at end of file diff --git a/ql/src/test/queries/clientpositive/collect_set_disable_map_aggr.q b/ql/src/test/queries/clientpositive/collect_set_disable_map_aggr.q new file mode 100644 index 000000000000..1b6a69bdf98d --- /dev/null +++ b/ql/src/test/queries/clientpositive/collect_set_disable_map_aggr.q @@ -0,0 +1,10 @@ +set hive.map.aggr=false; + +drop table tb1; + +create table tb1 (key int, startTime string, endTime string); + +insert into tb1 values (1, "100", "101"); +insert into tb1 values (2, "200", "201"); + +select key, collect_set(array(startTime, endTime)) as col1 from tb1 group by key order by key; \ No newline at end of file diff --git a/ql/src/test/results/clientpositive/collect_list_disable_map_aggr.q.out b/ql/src/test/results/clientpositive/collect_list_disable_map_aggr.q.out new file mode 100644 index 000000000000..e49440538d68 --- /dev/null +++ b/ql/src/test/results/clientpositive/collect_list_disable_map_aggr.q.out @@ -0,0 +1,44 @@ +PREHOOK: query: drop table tb1 +PREHOOK: type: DROPTABLE +POSTHOOK: query: drop table tb1 +POSTHOOK: type: DROPTABLE +PREHOOK: query: create table tb1 (key int, startTime string, endTime string) +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@tb1 +POSTHOOK: query: create table tb1 (key int, startTime string, endTime string) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@tb1 +PREHOOK: query: insert into tb1 values (1, "100", "101") +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@tb1 +POSTHOOK: query: insert into tb1 values (1, "100", "101") +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@tb1 +POSTHOOK: Lineage: tb1.endtime SCRIPT [] +POSTHOOK: Lineage: tb1.key SCRIPT [] +POSTHOOK: Lineage: tb1.starttime SCRIPT [] +PREHOOK: query: insert into tb1 values (2, "200", "201") +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@tb1 +POSTHOOK: query: insert into tb1 values (2, "200", "201") +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@tb1 +POSTHOOK: Lineage: tb1.endtime SCRIPT [] +POSTHOOK: Lineage: tb1.key SCRIPT [] +POSTHOOK: Lineage: tb1.starttime SCRIPT [] +PREHOOK: query: select key, collect_list(map("startTime",startTime,"endTime",endTime)) as col1 from tb1 group by key order by key +PREHOOK: type: QUERY +PREHOOK: Input: default@tb1 +#### A masked pattern was here #### +POSTHOOK: query: select key, collect_list(map("startTime",startTime,"endTime",endTime)) as col1 from tb1 group by key order by key +POSTHOOK: type: QUERY +POSTHOOK: Input: default@tb1 +#### A masked pattern was here #### +1 [{"startTime":"100","endTime":"101"}] +2 [{"startTime":"200","endTime":"201"}] diff --git a/ql/src/test/results/clientpositive/collect_set_disable_map_aggr.q.out b/ql/src/test/results/clientpositive/collect_set_disable_map_aggr.q.out new file mode 100644 index 000000000000..fd18b37bd18c --- /dev/null +++ b/ql/src/test/results/clientpositive/collect_set_disable_map_aggr.q.out @@ -0,0 +1,44 @@ +PREHOOK: query: drop table tb1 +PREHOOK: type: DROPTABLE +POSTHOOK: query: drop table tb1 +POSTHOOK: type: DROPTABLE +PREHOOK: query: create table tb1 (key int, startTime string, endTime string) +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@tb1 +POSTHOOK: query: create table tb1 (key int, startTime string, endTime string) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@tb1 +PREHOOK: query: insert into tb1 values (1, "100", "101") +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@tb1 +POSTHOOK: query: insert into tb1 values (1, "100", "101") +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@tb1 +POSTHOOK: Lineage: tb1.endtime SCRIPT [] +POSTHOOK: Lineage: tb1.key SCRIPT [] +POSTHOOK: Lineage: tb1.starttime SCRIPT [] +PREHOOK: query: insert into tb1 values (2, "200", "201") +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@tb1 +POSTHOOK: query: insert into tb1 values (2, "200", "201") +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@tb1 +POSTHOOK: Lineage: tb1.endtime SCRIPT [] +POSTHOOK: Lineage: tb1.key SCRIPT [] +POSTHOOK: Lineage: tb1.starttime SCRIPT [] +PREHOOK: query: select key, collect_set(array(startTime, endTime)) as col1 from tb1 group by key order by key +PREHOOK: type: QUERY +PREHOOK: Input: default@tb1 +#### A masked pattern was here #### +POSTHOOK: query: select key, collect_set(array(startTime, endTime)) as col1 from tb1 group by key order by key +POSTHOOK: type: QUERY +POSTHOOK: Input: default@tb1 +#### A masked pattern was here #### +1 [["100","101"]] +2 [["200","201"]]