From 9fb9ee06f2defa5da50ed82012727ef4c4ed8348 Mon Sep 17 00:00:00 2001 From: wuqinlong Date: Wed, 15 Feb 2017 10:45:12 +0800 Subject: [PATCH 1/3] patch HIVE-13602 and fix process position alias error when CBO is enabled and failed --- .../hive/ql/metadata/VirtualColumn.java | 11 + .../optimizer/ConstantPropagateProcCtx.java | 197 +++++++++++------- .../ConstantPropagateProcFactory.java | 41 ++++ .../hive/ql/parse/SemanticAnalyzer.java | 8 +- 4 files changed, 181 insertions(+), 76 deletions(-) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/VirtualColumn.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/VirtualColumn.java index ecc5d92e5012..e6a1e7686d1b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/VirtualColumn.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/VirtualColumn.java @@ -29,6 +29,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.common.classification.InterfaceAudience; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.ColumnInfo; import org.apache.hadoop.hive.ql.io.RecordIdentifier; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; @@ -175,4 +176,14 @@ public static StructObjectInspector getVCSObjectInspector(List vc } return ObjectInspectorFactory.getStandardStructObjectInspector(names, inspectors); } + + public static boolean isVirtualColumnBasedOnAlias(ColumnInfo column) { + // Not using method column.getIsVirtualCol() because partitioning columns + // are also treated as virtual columns in ColumnInfo. + if (column.getAlias() != null + && VirtualColumn.VIRTUAL_COLUMN_NAMES.contains(column.getAlias().toUpperCase())) { + return true; + } + return false; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagateProcCtx.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagateProcCtx.java index 6bb2a0993e06..36fae9d7e773 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagateProcCtx.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagateProcCtx.java @@ -32,7 +32,13 @@ import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.RowSchema; import org.apache.hadoop.hive.ql.exec.UnionOperator; +import org.apache.hadoop.hive.ql.exec.FilterOperator; +import org.apache.hadoop.hive.ql.exec.JoinOperator; +import org.apache.hadoop.hive.ql.exec.LimitOperator; +import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; +import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; +import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; /** @@ -59,37 +65,6 @@ public Map, Map> getO return opToConstantExprs; } - /** - * Resolve a ColumnInfo based on given RowResolver. - * - * @param ci - * @param rr - * @param parentRR - * @return - * @throws SemanticException - */ - private ColumnInfo resolve(ColumnInfo ci, RowSchema rs, RowSchema parentRS) { - // Resolve new ColumnInfo from - String alias = ci.getAlias(); - if (alias == null) { - alias = ci.getInternalName(); - } - String tblAlias = ci.getTabAlias(); - ColumnInfo rci = rs.getColumnInfo(tblAlias, alias); - if (rci == null && rs.getTableNames().size() == 1 && - parentRS.getTableNames().size() == 1) { - rci = rs.getColumnInfo(rs.getTableNames().iterator().next(), - alias); - } - if (rci == null) { - return null; - } - LOG.debug("Resolved " - + ci.getTabAlias() + "." + ci.getAlias() + " as " - + rci.getTabAlias() + "." + rci.getAlias() + " with rs: " + rs); - return rci; - } - /** * Get propagated constant map from parents. * @@ -115,58 +90,130 @@ public Map getPropagatedConstants( return constants; } - if (op instanceof UnionOperator) { - String alias = rs.getSignature().get(0).getTabAlias(); - // find intersection - Map intersection = null; - for (Operator parent : op.getParentOperators()) { - Map unionConst = opToConstantExprs.get(parent); - LOG.debug("Constant of op " + parent.getOperatorId() + " " + unionConst); - if (intersection == null) { - intersection = new HashMap(); - for (Entry e : unionConst.entrySet()) { - ColumnInfo ci = new ColumnInfo(e.getKey()); - ci.setTabAlias(alias); - intersection.put(ci, e.getValue()); + // A previous solution is based on tableAlias and colAlias, which is + // unsafe, esp. when CBO generates derived table names. see HIVE-13602. + // For correctness purpose, we only trust colExpMap. + // We assume that CBO can do the constantPropagation before this function is + // called to help improve the performance. + // UnionOperator, LimitOperator and FilterOperator are special, they should already be + // column-position aligned. + + List> parentsToConstant = new ArrayList<>(); + boolean areAllParentsContainConstant = true; + boolean noParentsContainConstant = true; + for (Operator parent : op.getParentOperators()) { + Map constMap = opToConstantExprs.get(parent); + if (constMap == null) { + LOG.debug("Constant of Op " + parent.getOperatorId() + " is not found"); + areAllParentsContainConstant = false; + } else { + noParentsContainConstant = false; + Map map = new HashMap<>(); + for (Entry entry : constMap.entrySet()) { + map.put(parent.getSchema().getPosition(entry.getKey().getInternalName()), + entry.getValue()); + } + parentsToConstant.add(map); + LOG.debug("Constant of Op " + parent.getOperatorId() + " " + constMap); + } + } + if (noParentsContainConstant) { + return constants; + } + + ArrayList signature = op.getSchema().getSignature(); + if (op instanceof LimitOperator || op instanceof FilterOperator) { + // there should be only one parent. + if (op.getParentOperators().size() == 1) { + Map parentToConstant = parentsToConstant.get(0); + for (int index = 0; index < signature.size(); index++) { + if (parentToConstant.containsKey(index)) { + constants.put(signature.get(index), parentToConstant.get(index)); } - } else { - Iterator> itr = intersection.entrySet().iterator(); - while (itr.hasNext()) { - Entry e = itr.next(); - boolean found = false; - for (Entry f : opToConstantExprs.get(parent).entrySet()) { - if (e.getKey().getInternalName().equals(f.getKey().getInternalName())) { - if (e.getValue().isSame(f.getValue())) { - found = true; - } + } + } + } else if (op instanceof UnionOperator && areAllParentsContainConstant) { + for (int index = 0; index < signature.size(); index++) { + ExprNodeDesc constant = null; + for (Map parentToConstant : parentsToConstant) { + if (!parentToConstant.containsKey(index)) { + // if this parent does not contain a constant at this position, we + // continue to look at other positions. + constant = null; + break; + } else { + if (constant == null) { + constant = parentToConstant.get(index); + } else { + // compare if they are the same constant. + ExprNodeDesc nextConstant = parentToConstant.get(index); + if (!nextConstant.isSame(constant)) { + // they are not the same constant. for example, union all of 1 + // and 2. + constant = null; break; } } - if (!found) { - itr.remove(); - } } } - if (intersection.isEmpty()) { - return intersection; + // we have checked all the parents for the "index" position. + if (constant != null) { + constants.put(signature.get(index), constant); } } - LOG.debug("Propagated union constants:" + intersection); - return intersection; - } - - for (Operator parent : op.getParentOperators()) { - Map c = opToConstantExprs.get(parent); - for (Entry e : c.entrySet()) { - ColumnInfo ci = e.getKey(); - ColumnInfo rci = null; - ExprNodeDesc constant = e.getValue(); - rci = resolve(ci, rs, parent.getSchema()); - if (rci != null) { - constants.put(rci, constant); - } else { - LOG.debug("Can't resolve " + ci.getTabAlias() + "." + ci.getAlias() + - "(" + ci.getInternalName() + ") from rs:" + rs); + } else if (op instanceof JoinOperator) { + JoinOperator joinOp = (JoinOperator) op; + Iterator>> itr = joinOp.getConf().getExprs().entrySet() + .iterator(); + while (itr.hasNext()) { + Entry> e = itr.next(); + int tag = e.getKey(); + Operator parent = op.getParentOperators().get(tag); + List exprs = e.getValue(); + if (exprs == null) { + continue; + } + for (ExprNodeDesc expr : exprs) { + // we are only interested in ExprNodeColumnDesc + if (expr instanceof ExprNodeColumnDesc) { + String parentColName = ((ExprNodeColumnDesc) expr).getColumn(); + // find this parentColName in its parent's rs + int parentPos = parent.getSchema().getPosition(parentColName); + if (parentsToConstant.get(tag).containsKey(parentPos)) { + // this position in parent is a constant + // reverse look up colExprMap to find the childColName + if (op.getColumnExprMap() != null && op.getColumnExprMap().entrySet() != null) { + for (Entry entry : op.getColumnExprMap().entrySet()) { + if (entry.getValue().isSame(expr)) { + // now propagate the constant from the parent to the child + constants.put(signature.get(op.getSchema().getPosition(entry.getKey())), + parentsToConstant.get(tag).get(parentPos)); + } + } + } + } + } + } + } + } else { + // there should be only one parent. + if (op.getParentOperators().size() == 1) { + Operator parent = op.getParentOperators().get(0); + if (op.getColumnExprMap() != null && op.getColumnExprMap().entrySet() != null) { + for (Entry entry : op.getColumnExprMap().entrySet()) { + ExprNodeDesc expr = entry.getValue(); + if (expr instanceof ExprNodeColumnDesc) { + String parentColName = ((ExprNodeColumnDesc) expr).getColumn(); + // find this parentColName in its parent's rs + int parentPos = parent.getSchema().getPosition(parentColName); + if (parentsToConstant.get(0).containsKey(parentPos)) { + // this position in parent is a constant + // now propagate the constant from the parent to the child + constants.put(signature.get(op.getSchema().getPosition(entry.getKey())), + parentsToConstant.get(0).get(parentPos)); + } + } + } } } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagateProcFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagateProcFactory.java index e66de1ae32c9..99e675787ec4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagateProcFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagateProcFactory.java @@ -48,6 +48,7 @@ import org.apache.hadoop.hive.ql.lib.NodeProcessor; import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.metadata.VirtualColumn; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx; import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; @@ -782,6 +783,18 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx ctx, Object.. Map colToConstants = cppCtx.getPropagatedConstants(op); cppCtx.getOpToConstantExprs().put(op, colToConstants); + RowSchema rs = op.getSchema(); + if (op.getColumnExprMap() != null && rs != null) { + for (ColumnInfo colInfo : rs.getSignature()) { + if (!VirtualColumn.isVirtualColumnBasedOnAlias(colInfo)) { + ExprNodeDesc expr = op.getColumnExprMap().get(colInfo.getInternalName()); + if (expr instanceof ExprNodeConstantDesc) { + colToConstants.put(colInfo, expr); + } + } + } + } + if (colToConstants.isEmpty()) { return null; } @@ -819,6 +832,17 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx ctx, Object.. Operator op = (Operator) nd; Map constants = cppCtx.getPropagatedConstants(op); cppCtx.getOpToConstantExprs().put(op, constants); + RowSchema rs = op.getSchema(); + if (op.getColumnExprMap() != null && rs != null) { + for (ColumnInfo colInfo : rs.getSignature()) { + if (!VirtualColumn.isVirtualColumnBasedOnAlias(colInfo)) { + ExprNodeDesc expr = op.getColumnExprMap().get(colInfo.getInternalName()); + if (expr instanceof ExprNodeConstantDesc) { + constants.put(colInfo, expr); + } + } + } + } if (constants.isEmpty()) { return null; } @@ -870,6 +894,12 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx ctx, Object.. } } colList.set(i, newCol); + if (newCol instanceof ExprNodeConstantDesc && op.getSchema() != null) { + ColumnInfo colInfo = op.getSchema().getSignature().get(i); + if (!VirtualColumn.isVirtualColumnBasedOnAlias(colInfo)) { + constants.put(colInfo, newCol); + } + } if (columnExprMap != null) { columnExprMap.put(columnNames.get(i), newCol); } @@ -976,6 +1006,17 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx ctx, Object.. Map constants = cppCtx.getPropagatedConstants(op); cppCtx.getOpToConstantExprs().put(op, constants); + RowSchema rs = op.getSchema(); + if (op.getColumnExprMap() != null && rs != null) { + for (ColumnInfo colInfo : rs.getSignature()) { + if (!VirtualColumn.isVirtualColumnBasedOnAlias(colInfo)) { + ExprNodeDesc expr = op.getColumnExprMap().get(colInfo.getInternalName()); + if (expr instanceof ExprNodeConstantDesc) { + constants.put(colInfo, expr); + } + } + } + } if (constants.isEmpty()) { return null; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index b09bde4dc77b..6965f6e837da 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -282,6 +282,8 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { //flag for partial scan during analyze ... compute statistics protected boolean partialscan; + protected boolean isAliasProcessed; + protected volatile boolean disableJoinMerge = false; /* @@ -339,6 +341,7 @@ public SemanticAnalyzer(HiveConf conf) throws SemanticException { globalLimitCtx = new GlobalLimitCtx(); viewAliasToInput = new HashMap(); noscan = partialscan = false; + isAliasProcessed = false; } @Override @@ -10044,7 +10047,10 @@ boolean genResolvedParseTree(ASTNode ast, PlannerContext plannerCtx) throws Sema ctesExpanded = new ArrayList(); // 1. analyze and process the position alias - processPositionAlias(ast); + if (!isAliasProcessed) { + processPositionAlias(ast); + isAliasProcessed = true; + } // 2. analyze create table command if (ast.getToken().getType() == HiveParser.TOK_CREATETABLE) { From cf800cca665baa8205f0c5cb78b3e851fbf74b16 Mon Sep 17 00:00:00 2001 From: wuqinlong Date: Wed, 15 Feb 2017 13:17:50 +0800 Subject: [PATCH 2/3] patch HIVE-13602 and fix process position alias error when CBO is enabled and failed --- .../apache/hadoop/hive/ql/parse/SemanticAnalyzer.java | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index 6965f6e837da..69cd5c5bff91 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -10047,10 +10047,7 @@ boolean genResolvedParseTree(ASTNode ast, PlannerContext plannerCtx) throws Sema ctesExpanded = new ArrayList(); // 1. analyze and process the position alias - if (!isAliasProcessed) { - processPositionAlias(ast); - isAliasProcessed = true; - } + // step out position alias process // 2. analyze create table command if (ast.getToken().getType() == HiveParser.TOK_CREATETABLE) { @@ -10136,6 +10133,10 @@ Operator genOPTree(ASTNode ast, PlannerContext plannerCtx) throws SemanticExcept void analyzeInternal(ASTNode ast, PlannerContext plannerCtx) throws SemanticException { // 1. Generate Resolved Parse tree from syntax tree LOG.info("Starting Semantic Analysis"); + + //change the location of position alias process here + processPositionAlias(ast); + if (!genResolvedParseTree(ast, plannerCtx)) { return; } From db5cc6a22182c7684ba6a54b38d682b4bb1fb184 Mon Sep 17 00:00:00 2001 From: wuqinlong Date: Wed, 15 Feb 2017 13:20:34 +0800 Subject: [PATCH 3/3] patch HIVE-13602 and fix process position alias error when CBO is enabled and failed --- .../java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index 69cd5c5bff91..6a357e0ab9d8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -282,8 +282,6 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { //flag for partial scan during analyze ... compute statistics protected boolean partialscan; - protected boolean isAliasProcessed; - protected volatile boolean disableJoinMerge = false; /* @@ -341,7 +339,6 @@ public SemanticAnalyzer(HiveConf conf) throws SemanticException { globalLimitCtx = new GlobalLimitCtx(); viewAliasToInput = new HashMap(); noscan = partialscan = false; - isAliasProcessed = false; } @Override