Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HIVE-28208: WITH column list doesn't work with CTE materialization #5232

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ public class CreateTableDesc extends DDLDescWithTableProperties implements Seria
List<String> skewedColNames;
List<List<String>> skewedColValues;
boolean isStoredAsSubDirectories = false;
private List<String> withColList;
private boolean replaceMode = false;
private ReplicationSpec replicationSpec = null;
private boolean isCTAS = false;
Expand Down Expand Up @@ -652,6 +653,20 @@ public void setMaterialization(boolean isMaterialization) {
this.isMaterialization = isMaterialization;
}

/**
* @return the with-column-list of this CTE
*/
public List<String> getWithColList() {
return withColList;
}

/**
* @param withColList the column list
*/
public void setWithColList(List<String> withColList) {
this.withColList = withColList;
}

/**
* @param replaceMode Determine if this CreateTable should behave like a replace-into alter instead
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1048,6 +1048,9 @@ Table materializeCTE(String cteName, CTEClause cte) throws HiveException {

createTable.addChild(tableName);
createTable.addChild(temporary);
if (cte.withColList != null) {
createTable.addChild(cte.withColList);
}
createTable.addChild(cte.cteNode);

CalcitePlanner analyzer = new CalcitePlanner(queryState);
Expand Down
43 changes: 37 additions & 6 deletions ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import static org.apache.hadoop.hive.ql.ddl.view.create.AbstractCreateViewAnalyzer.validateTablesUsed;
import static org.apache.hadoop.hive.ql.optimizer.calcite.translator.ASTConverter.NON_FK_FILTERED;

import com.google.common.base.Preconditions;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.security.AccessControlException;
Expand Down Expand Up @@ -1574,6 +1575,9 @@ Table materializeCTE(String cteName, CTEClause cte) throws HiveException {

createTable.addChild(tableName);
createTable.addChild(temporary);
if (cte.withColList != null) {
createTable.addChild(cte.withColList);
}
createTable.addChild(cte.cteNode);

SemanticAnalyzer analyzer = new SemanticAnalyzer(queryState);
Expand Down Expand Up @@ -7831,10 +7835,20 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input)
if (dpCtx != null) {
throw new SemanticException("Dynamic partition context has already been created, this should not happen");
}
if (!CollectionUtils.isEmpty(partitionColumnNames)) {
if (tblDesc != null && tblDesc.getWithColList() != null && !tblDesc.getWithColList().isEmpty()) {
Preconditions.checkState(tblDesc.isMaterialization());
if (tblDesc.getWithColList().size() > inputRR.getColumnInfos().size()) {
throw new SemanticException(ErrorMsg.WITH_COL_LIST_NUM_OVERFLOW, tblDesc.getFullTableName().getTable(),
Integer.toString(inputRR.getColumnInfos().size()), Integer.toString(tblDesc.getWithColList().size()));
}
ColsAndTypes ct = deriveFileSinkColTypes(inputRR, fieldSchemas, tblDesc.getWithColList());
cols = ct.cols;
colTypes = ct.colTypes;
isPartitioned = false;
} else if (!CollectionUtils.isEmpty(partitionColumnNames)) {
ColsAndTypes ct = deriveFileSinkColTypes(
inputRR, partitionColumnNames, sortColumnNames, distributeColumnNames, fieldSchemas, partitionColumns,
sortColumns, distributeColumns, fileSinkColInfos, sortColInfos, distributeColInfos);
sortColumns, distributeColumns, fileSinkColInfos, sortColInfos, distributeColInfos, new ArrayList<>());
cols = ct.cols;
colTypes = ct.colTypes;
dpCtx = new DynamicPartitionCtx(partitionColumnNames,
Expand Down Expand Up @@ -8302,13 +8316,21 @@ private ColsAndTypes deriveFileSinkColTypes(RowResolver inputRR, List<String> so
List<ColumnInfo> sortColInfos, List<ColumnInfo> distributeColInfos) throws SemanticException {
return deriveFileSinkColTypes(inputRR, new ArrayList<>(), sortColumnNames, distributeColumnNames,
fieldSchemas, new ArrayList<>(), sortColumns, distributeColumns, new ArrayList<>(),
sortColInfos, distributeColInfos);
sortColInfos, distributeColInfos, new ArrayList<>());
}

private ColsAndTypes deriveFileSinkColTypes(RowResolver inputRR, List<FieldSchema> fieldSchemas,
List<String> withColList) throws SemanticException {
return deriveFileSinkColTypes(inputRR, new ArrayList<>(), new ArrayList<>(), new ArrayList<>(), fieldSchemas,
new ArrayList<>(), new ArrayList<>(), new ArrayList<>(), new ArrayList<>(), new ArrayList<>(),
new ArrayList<>(), withColList);
}

private ColsAndTypes deriveFileSinkColTypes(
RowResolver inputRR, List<String> partitionColumnNames, List<String> sortColumnNames, List<String> distributeColumnNames,
List<FieldSchema> columns, List<FieldSchema> partitionColumns, List<FieldSchema> sortColumns, List<FieldSchema> distributeColumns,
List<ColumnInfo> fileSinkColInfos, List<ColumnInfo> sortColInfos, List<ColumnInfo> distributeColInfos) throws SemanticException {
List<ColumnInfo> fileSinkColInfos, List<ColumnInfo> sortColInfos, List<ColumnInfo> distributeColInfos,
List<String> withColList) throws SemanticException {
ColsAndTypes result = new ColsAndTypes("", "");
List<String> allColumns = new ArrayList<>();
List<ColumnInfo> colInfos = inputRR.getColumnInfos();
Expand All @@ -8321,7 +8343,8 @@ private ColsAndTypes deriveFileSinkColTypes(
if (numNonPartitionedCols <= 0) {
throw new SemanticException("Too many partition columns declared");
}
for (ColumnInfo colInfo : colInfos) {
for (int i = 0; i < colInfos.size(); i++) {
final ColumnInfo colInfo = colInfos.get(i);
String[] nm = inputRR.reverseLookup(colInfo.getInternalName());

if (nm[1] != null) { // non-null column alias
Expand All @@ -8332,7 +8355,9 @@ private ColsAndTypes deriveFileSinkColTypes(
String colName = colInfo.getInternalName(); //default column name
if (columns != null) {
FieldSchema col = new FieldSchema();
if (!("".equals(nm[0])) && nm[1] != null) {
if (i < withColList.size()) {
Copy link
Contributor Author

@okumin okumin May 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using an input name when is shorter than the input. This follows the behavior of non-materialized CTEs.
If I understand correctly, with-column-list is supported only when the number of columns is exactly equal to that of the query expression. So, it could be an option to disallow colInfos.size() != withColList.size().

colName = withColList.get(i);
} else if (!("".equals(nm[0])) && nm[1] != null) {
colName = unescapeIdentifier(colInfo.getAlias()).toLowerCase(); // remove ``
}
colName = fixCtasColumnName(colName);
Expand Down Expand Up @@ -13952,6 +13977,7 @@ ASTNode analyzeCreateTable(
boolean isTemporary = false;
boolean isManaged = false;
boolean isMaterialization = false;
List<String> withColList = new ArrayList<>();
boolean isTransactional = false;
ASTNode selectStmt = null;
final int CREATE_TABLE = 0; // regular CREATE TABLE
Expand Down Expand Up @@ -14011,6 +14037,10 @@ ASTNode analyzeCreateTable(
isTemporary = true;
isMaterialization = MATERIALIZATION_MARKER.equals(child.getText());
break;
case HiveParser.TOK_TABCOLNAME:
Preconditions.checkState(isMaterialization);
withColList = processTableColumnNames(child, qualifiedTabName.getTable());
break;
case HiveParser.KW_TRANSACTIONAL:
isTransactional = true;
command_type = CTT;
Expand Down Expand Up @@ -14399,6 +14429,7 @@ ASTNode analyzeCreateTable(
skewedColNames, skewedValues, true, primaryKeys, foreignKeys,
uniqueConstraints, notNullConstraints, defaultConstraints, checkConstraints);
tableDesc.setMaterialization(isMaterialization);
tableDesc.setWithColList(withColList);
tableDesc.setStoredAsSubDirectories(storedAsDirs);
tableDesc.setNullFormat(rowFormatParams.nullFormat);
qb.setTableDesc(tableDesc);
Expand Down
8 changes: 8 additions & 0 deletions ql/src/test/queries/clientnegative/cte_mat_col_alias.q
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
create table t1(int_col int, bigint_col bigint);

set hive.optimize.cte.materialize.threshold=1;
set hive.optimize.cte.materialize.full.aggregate.only=false;

explain cbo
with cte1(a, b, c) as (select int_col x, bigint_col y from t1)
select a, b from cte1;
29 changes: 28 additions & 1 deletion ql/src/test/queries/clientpositive/cte_mat_1.q
Original file line number Diff line number Diff line change
@@ -1,10 +1,37 @@
--! qt:dataset:src
set hive.mapred.mode=nonstrict;
set hive.optimize.cte.materialize.threshold=-1;
set hive.explain.user=true;

explain
with q1(srcKey, srcValue) as (select * from src where key= '5')
select a.srcKey, b.srcValue
from q1 a join q1 b
on a.srcKey=b.srcKey;

set hive.optimize.cte.materialize.threshold=2;
set hive.optimize.cte.materialize.full.aggregate.only=false;
-- Use a format that retains column names
set hive.default.fileformat=parquet;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The default one positionally accesses physical columns, and we can't find some kinds of errors.


explain
with q1(srcKey, srcValue) as (select * from src where key= '5')
select a.srcKey, b.srcValue
from q1 a join q1 b
on a.srcKey=b.srcKey;

with q1(srcKey, srcValue) as (select * from src where key= '5')
select a.srcKey, b.srcValue
from q1 a join q1 b
on a.srcKey=b.srcKey;

-- Hive allows <with column list> to have a smaller number of columns than the query expression
explain
with q1(`srcKey`) as (select * from src where key= '5')
select a.srcKey
from q1 a join q1 b
on a.srcKey=b.srcKey;

with q1(`srcKey`) as (select * from src where key= '5')
select a.srcKey
from q1 a join q1 b
on a.srcKey=b.srcKey;
9 changes: 9 additions & 0 deletions ql/src/test/results/clientnegative/cte_mat_col_alias.q.out
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
PREHOOK: query: create table t1(int_col int, bigint_col bigint)
PREHOOK: type: CREATETABLE
PREHOOK: Output: database:default
PREHOOK: Output: default@t1
POSTHOOK: query: create table t1(int_col int, bigint_col bigint)
POSTHOOK: type: CREATETABLE
POSTHOOK: Output: database:default
POSTHOOK: Output: default@t1
FAILED: SemanticException [Error 10425]: WITH-clause query cte1 returns 2 columns, but 3 labels were specified. The number of column labels must be smaller or equal to the number of expressions returned by the query.
Loading
Loading