Skip to content

Commit

Permalink
[FLINK-29337][hive] Fix fail to query non-hive table in Hive dialect
Browse files Browse the repository at this point in the history
This closes #21034
  • Loading branch information
luoyuxia committed Oct 14, 2022
1 parent 9935b0c commit c8161fd
Show file tree
Hide file tree
Showing 14 changed files with 912 additions and 647 deletions.
Expand Up @@ -380,7 +380,10 @@ private Operation convertASTNodeToOperation(
if (isLoadData(input)) {
HiveParserLoadSemanticAnalyzer loadSemanticAnalyzer =
new HiveParserLoadSemanticAnalyzer(
hiveConf, frameworkConfig, plannerContext.getCluster());
hiveConf,
frameworkConfig,
plannerContext.getCluster(),
getCatalogManager());
return loadSemanticAnalyzer.convertToOperation(input);
}
if (isMultiDestQuery(input)) {
Expand Down Expand Up @@ -447,16 +450,14 @@ private Operation processMultiDestQuery(
}

public HiveParserCalcitePlanner createCalcitePlanner(
HiveParserContext context, HiveParserQueryState queryState, HiveShim hiveShim)
throws SemanticException {
HiveParserContext context, HiveParserQueryState queryState) throws SemanticException {
HiveParserCalcitePlanner calciteAnalyzer =
new HiveParserCalcitePlanner(
queryState,
plannerContext,
catalogReader,
frameworkConfig,
getCatalogManager(),
hiveShim);
getCatalogManager());
calciteAnalyzer.initCtx(context);
calciteAnalyzer.init(false);
return calciteAnalyzer;
Expand All @@ -465,11 +466,9 @@ public HiveParserCalcitePlanner createCalcitePlanner(
public void analyzeCreateView(
HiveParserCreateViewInfo createViewInfo,
HiveParserContext context,
HiveParserQueryState queryState,
HiveShim hiveShim)
HiveParserQueryState queryState)
throws SemanticException {
HiveParserCalcitePlanner calciteAnalyzer =
createCalcitePlanner(context, queryState, hiveShim);
HiveParserCalcitePlanner calciteAnalyzer = createCalcitePlanner(context, queryState);
calciteAnalyzer.setCreatViewInfo(createViewInfo);
calciteAnalyzer.genLogicalPlan(createViewInfo.getQuery());
}
Expand All @@ -478,7 +477,7 @@ private Operation analyzeSql(
HiveParserContext context, HiveConf hiveConf, HiveShim hiveShim, HiveParserASTNode node)
throws SemanticException {
HiveParserCalcitePlanner analyzer =
createCalcitePlanner(context, new HiveParserQueryState(hiveConf), hiveShim);
createCalcitePlanner(context, new HiveParserQueryState(hiveConf));
RelNode relNode = analyzer.genLogicalPlan(node);
if (relNode == null) {
return new NopOperation();
Expand Down
Expand Up @@ -18,8 +18,11 @@

package org.apache.flink.table.planner.delegation.hive;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.catalog.CatalogManager;
import org.apache.flink.table.catalog.hive.client.HiveShim;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.hive.util.HiveTypeUtil;
import org.apache.flink.table.functions.hive.conversion.HiveInspectors;
import org.apache.flink.table.planner.calcite.FlinkPlannerImpl;
Expand All @@ -30,7 +33,6 @@
import org.apache.flink.table.planner.delegation.hive.copy.HiveParserASTNode;
import org.apache.flink.table.planner.delegation.hive.copy.HiveParserBaseSemanticAnalyzer;
import org.apache.flink.table.planner.delegation.hive.copy.HiveParserBaseSemanticAnalyzer.AggInfo;
import org.apache.flink.table.planner.delegation.hive.copy.HiveParserBaseSemanticAnalyzer.TableType;
import org.apache.flink.table.planner.delegation.hive.copy.HiveParserContext;
import org.apache.flink.table.planner.delegation.hive.copy.HiveParserJoinTypeCheckCtx;
import org.apache.flink.table.planner.delegation.hive.copy.HiveParserNamedJoinInfo;
Expand Down Expand Up @@ -110,7 +112,6 @@
import org.apache.hadoop.hive.ql.exec.ColumnInfo;
import org.apache.hadoop.hive.ql.exec.FunctionRegistry;
import org.apache.hadoop.hive.ql.lib.Node;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
import org.apache.hadoop.hive.ql.optimizer.calcite.HiveRelOptUtil;
import org.apache.hadoop.hive.ql.parse.ColumnAccessInfo;
Expand Down Expand Up @@ -145,6 +146,7 @@
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

Expand All @@ -168,7 +170,6 @@
import static org.apache.flink.table.planner.delegation.hive.copy.HiveParserBaseSemanticAnalyzer.getPartitionKeys;
import static org.apache.flink.table.planner.delegation.hive.copy.HiveParserBaseSemanticAnalyzer.getWindowSpecIndx;
import static org.apache.flink.table.planner.delegation.hive.copy.HiveParserBaseSemanticAnalyzer.initPhase1Ctx;
import static org.apache.flink.table.planner.delegation.hive.copy.HiveParserBaseSemanticAnalyzer.obtainTableType;
import static org.apache.flink.table.planner.delegation.hive.copy.HiveParserBaseSemanticAnalyzer.processPositionAlias;
import static org.apache.flink.table.planner.delegation.hive.copy.HiveParserBaseSemanticAnalyzer.removeOBInSubQuery;
import static org.apache.flink.table.planner.delegation.hive.copy.HiveParserBaseSemanticAnalyzer.topLevelConjunctCheck;
Expand Down Expand Up @@ -205,8 +206,7 @@ public HiveParserCalcitePlanner(
PlannerContext plannerContext,
FlinkCalciteCatalogReader catalogReader,
FrameworkConfig frameworkConfig,
CatalogManager catalogManager,
HiveShim hiveShim)
CatalogManager catalogManager)
throws SemanticException {
this.catalogManager = catalogManager;
this.catalogReader = catalogReader;
Expand All @@ -216,7 +216,7 @@ public HiveParserCalcitePlanner(
this.hiveConf = queryState.getConf();
this.semanticAnalyzer =
new HiveParserSemanticAnalyzer(
queryState, hiveShim, frameworkConfig, plannerContext.getCluster());
queryState, frameworkConfig, plannerContext.getCluster(), catalogManager);
this.cluster = plannerContext.getCluster();
this.funcConverter =
new SqlFunctionConverter(
Expand Down Expand Up @@ -801,66 +801,59 @@ private RelNode genTableLogicalPlan(String tableAlias, HiveParserQB qb)
}

// 2. Get Table Metadata
Table table = qb.getMetaData().getSrcForAlias(tableAlias);
if (table.isTemporary()) {
// Hive creates a temp table for VALUES, we need to convert it to LogicalValues
if (qb.getValuesTableToData().containsKey(tableAlias)) {
// a temp table has been created for VALUES, we need to convert it to LogicalValues
Tuple2<CatalogTable, List<List<String>>> tableValueTuple =
qb.getValuesTableToData().get(tableAlias);
RelNode values =
genValues(
tableAlias,
table,
tableValueTuple.f0,
rowResolver,
cluster,
getQB().getValuesTableToData().get(tableAlias));
tableValueTuple.f1);
relToRowResolver.put(values, rowResolver);
relToHiveColNameCalcitePosMap.put(values, buildHiveToCalciteColumnMap(rowResolver));
return values;
} else {
// 3. Get Table Logical Schema (Row Type)
// NOTE: Table logical schema = Non Partition Cols + Partition Cols + Virtual Cols

// 3.1 Add Column info for non partition cols (Object Inspector fields)
StructObjectInspector rowObjectInspector =
(StructObjectInspector) table.getDeserializer().getObjectInspector();
List<? extends StructField> fields = rowObjectInspector.getAllStructFieldRefs();
Tuple2<String, CatalogTable> nameAndTableTuple =
qb.getMetaData().getSrcForAlias(tableAlias);
String tableName = nameAndTableTuple.f0;
CatalogTable catalogTable = nameAndTableTuple.f1;
TableSchema schema =
HiveParserUtils.fromUnresolvedSchema(catalogTable.getUnresolvedSchema());
String[] fieldNames = schema.getFieldNames();
ColumnInfo colInfo;
String colName;
for (StructField field : fields) {
colName = field.getFieldName();
colInfo =
new ColumnInfo(
field.getFieldName(),
TypeInfoUtils.getTypeInfoFromObjectInspector(
field.getFieldObjectInspector()),
tableAlias,
// 3.1 Add Column info
for (String fieldName : fieldNames) {
Optional<DataType> dataType = schema.getFieldDataType(fieldName);
TypeInfo hiveType =
HiveTypeUtil.toHiveTypeInfo(
dataType.orElseThrow(
() ->
new SemanticException(
String.format(
"Can't get data type for column %s of table %s.",
fieldName, tableName))),
false);
colInfo.setSkewedCol(HiveParserUtils.isSkewedCol(tableAlias, qb, colName));
rowResolver.put(tableAlias, colName, colInfo);
}

// 3.2 Add column info corresponding to partition columns
for (FieldSchema partCol : table.getPartCols()) {
colName = partCol.getName();
colInfo =
new ColumnInfo(
colName,
TypeInfoFactory.getPrimitiveTypeInfo(partCol.getType()),
tableAlias,
true);
rowResolver.put(tableAlias, colName, colInfo);
colInfo = new ColumnInfo(fieldName, hiveType, tableAlias, false);
colInfo.setSkewedCol(HiveParserUtils.isSkewedCol(tableAlias, qb, fieldName));
rowResolver.put(tableAlias, fieldName, colInfo);
}

final TableType tableType = obtainTableType(table);
Preconditions.checkArgument(
tableType == TableType.NATIVE, "Only native tables are supported");
ObjectIdentifier tableIdentifier =
HiveParserBaseSemanticAnalyzer.parseCompoundName(catalogManager, tableName);

// Build Hive Table Scan Rel
RelNode tableRel =
catalogReader
.getTable(
Arrays.asList(
catalogManager.getCurrentCatalog(),
table.getDbName(),
table.getTableName()))
tableIdentifier.getCatalogName(),
tableIdentifier.getDatabaseName(),
tableIdentifier.getObjectName()))
.toRel(
ViewExpanders.toRelContext(
flinkPlanner.createToRelContext(), cluster));
Expand Down Expand Up @@ -1029,7 +1022,7 @@ private boolean genSubQueryRelNode(
HiveParserBaseSemanticAnalyzer.Phase1Ctx ctx1 = initPhase1Ctx();
semanticAnalyzer.doPhase1(
(HiveParserASTNode) next.getChild(1), subQB, ctx1, null);
semanticAnalyzer.getMetaData(subQB);
semanticAnalyzer.getMetaData(subQB, false);
RelNode subQueryRelNode =
genLogicalPlan(
subQB,
Expand Down

0 comments on commit c8161fd

Please sign in to comment.