diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParser.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParser.java index c8d1630133e4b..cfee6c9f00ce1 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParser.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParser.java @@ -380,7 +380,10 @@ private Operation convertASTNodeToOperation( if (isLoadData(input)) { HiveParserLoadSemanticAnalyzer loadSemanticAnalyzer = new HiveParserLoadSemanticAnalyzer( - hiveConf, frameworkConfig, plannerContext.getCluster()); + hiveConf, + frameworkConfig, + plannerContext.getCluster(), + getCatalogManager()); return loadSemanticAnalyzer.convertToOperation(input); } if (isMultiDestQuery(input)) { @@ -447,16 +450,14 @@ private Operation processMultiDestQuery( } public HiveParserCalcitePlanner createCalcitePlanner( - HiveParserContext context, HiveParserQueryState queryState, HiveShim hiveShim) - throws SemanticException { + HiveParserContext context, HiveParserQueryState queryState) throws SemanticException { HiveParserCalcitePlanner calciteAnalyzer = new HiveParserCalcitePlanner( queryState, plannerContext, catalogReader, frameworkConfig, - getCatalogManager(), - hiveShim); + getCatalogManager()); calciteAnalyzer.initCtx(context); calciteAnalyzer.init(false); return calciteAnalyzer; @@ -465,11 +466,9 @@ public HiveParserCalcitePlanner createCalcitePlanner( public void analyzeCreateView( HiveParserCreateViewInfo createViewInfo, HiveParserContext context, - HiveParserQueryState queryState, - HiveShim hiveShim) + HiveParserQueryState queryState) throws SemanticException { - HiveParserCalcitePlanner calciteAnalyzer = - createCalcitePlanner(context, queryState, hiveShim); + HiveParserCalcitePlanner calciteAnalyzer = createCalcitePlanner(context, queryState); calciteAnalyzer.setCreatViewInfo(createViewInfo); calciteAnalyzer.genLogicalPlan(createViewInfo.getQuery()); } @@ -478,7 +477,7 @@ private Operation analyzeSql( HiveParserContext context, HiveConf hiveConf, HiveShim hiveShim, HiveParserASTNode node) throws SemanticException { HiveParserCalcitePlanner analyzer = - createCalcitePlanner(context, new HiveParserQueryState(hiveConf), hiveShim); + createCalcitePlanner(context, new HiveParserQueryState(hiveConf)); RelNode relNode = analyzer.genLogicalPlan(node); if (relNode == null) { return new NopOperation(); diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserCalcitePlanner.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserCalcitePlanner.java index 4244dbde9805a..992d5af27d76e 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserCalcitePlanner.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserCalcitePlanner.java @@ -18,8 +18,11 @@ package org.apache.flink.table.planner.delegation.hive; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.catalog.CatalogManager; -import org.apache.flink.table.catalog.hive.client.HiveShim; +import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.catalog.ObjectIdentifier; import org.apache.flink.table.catalog.hive.util.HiveTypeUtil; import org.apache.flink.table.functions.hive.conversion.HiveInspectors; import org.apache.flink.table.planner.calcite.FlinkPlannerImpl; @@ -30,7 +33,6 @@ import org.apache.flink.table.planner.delegation.hive.copy.HiveParserASTNode; import org.apache.flink.table.planner.delegation.hive.copy.HiveParserBaseSemanticAnalyzer; import org.apache.flink.table.planner.delegation.hive.copy.HiveParserBaseSemanticAnalyzer.AggInfo; -import org.apache.flink.table.planner.delegation.hive.copy.HiveParserBaseSemanticAnalyzer.TableType; import org.apache.flink.table.planner.delegation.hive.copy.HiveParserContext; import org.apache.flink.table.planner.delegation.hive.copy.HiveParserJoinTypeCheckCtx; import org.apache.flink.table.planner.delegation.hive.copy.HiveParserNamedJoinInfo; @@ -110,7 +112,6 @@ import org.apache.hadoop.hive.ql.exec.ColumnInfo; import org.apache.hadoop.hive.ql.exec.FunctionRegistry; import org.apache.hadoop.hive.ql.lib.Node; -import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.metadata.VirtualColumn; import org.apache.hadoop.hive.ql.optimizer.calcite.HiveRelOptUtil; import org.apache.hadoop.hive.ql.parse.ColumnAccessInfo; @@ -145,6 +146,7 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; @@ -168,7 +170,6 @@ import static org.apache.flink.table.planner.delegation.hive.copy.HiveParserBaseSemanticAnalyzer.getPartitionKeys; import static org.apache.flink.table.planner.delegation.hive.copy.HiveParserBaseSemanticAnalyzer.getWindowSpecIndx; import static org.apache.flink.table.planner.delegation.hive.copy.HiveParserBaseSemanticAnalyzer.initPhase1Ctx; -import static org.apache.flink.table.planner.delegation.hive.copy.HiveParserBaseSemanticAnalyzer.obtainTableType; import static org.apache.flink.table.planner.delegation.hive.copy.HiveParserBaseSemanticAnalyzer.processPositionAlias; import static org.apache.flink.table.planner.delegation.hive.copy.HiveParserBaseSemanticAnalyzer.removeOBInSubQuery; import static org.apache.flink.table.planner.delegation.hive.copy.HiveParserBaseSemanticAnalyzer.topLevelConjunctCheck; @@ -205,8 +206,7 @@ public HiveParserCalcitePlanner( PlannerContext plannerContext, FlinkCalciteCatalogReader catalogReader, FrameworkConfig frameworkConfig, - CatalogManager catalogManager, - HiveShim hiveShim) + CatalogManager catalogManager) throws SemanticException { this.catalogManager = catalogManager; this.catalogReader = catalogReader; @@ -216,7 +216,7 @@ public HiveParserCalcitePlanner( this.hiveConf = queryState.getConf(); this.semanticAnalyzer = new HiveParserSemanticAnalyzer( - queryState, hiveShim, frameworkConfig, plannerContext.getCluster()); + queryState, frameworkConfig, plannerContext.getCluster(), catalogManager); this.cluster = plannerContext.getCluster(); this.funcConverter = new SqlFunctionConverter( @@ -801,66 +801,59 @@ private RelNode genTableLogicalPlan(String tableAlias, HiveParserQB qb) } // 2. Get Table Metadata - Table table = qb.getMetaData().getSrcForAlias(tableAlias); - if (table.isTemporary()) { - // Hive creates a temp table for VALUES, we need to convert it to LogicalValues + if (qb.getValuesTableToData().containsKey(tableAlias)) { + // a temp table has been created for VALUES, we need to convert it to LogicalValues + Tuple2>> tableValueTuple = + qb.getValuesTableToData().get(tableAlias); RelNode values = genValues( tableAlias, - table, + tableValueTuple.f0, rowResolver, cluster, - getQB().getValuesTableToData().get(tableAlias)); + tableValueTuple.f1); relToRowResolver.put(values, rowResolver); relToHiveColNameCalcitePosMap.put(values, buildHiveToCalciteColumnMap(rowResolver)); return values; } else { // 3. Get Table Logical Schema (Row Type) // NOTE: Table logical schema = Non Partition Cols + Partition Cols + Virtual Cols - - // 3.1 Add Column info for non partition cols (Object Inspector fields) - StructObjectInspector rowObjectInspector = - (StructObjectInspector) table.getDeserializer().getObjectInspector(); - List fields = rowObjectInspector.getAllStructFieldRefs(); + Tuple2 nameAndTableTuple = + qb.getMetaData().getSrcForAlias(tableAlias); + String tableName = nameAndTableTuple.f0; + CatalogTable catalogTable = nameAndTableTuple.f1; + TableSchema schema = + HiveParserUtils.fromUnresolvedSchema(catalogTable.getUnresolvedSchema()); + String[] fieldNames = schema.getFieldNames(); ColumnInfo colInfo; - String colName; - for (StructField field : fields) { - colName = field.getFieldName(); - colInfo = - new ColumnInfo( - field.getFieldName(), - TypeInfoUtils.getTypeInfoFromObjectInspector( - field.getFieldObjectInspector()), - tableAlias, + // 3.1 Add Column info + for (String fieldName : fieldNames) { + Optional dataType = schema.getFieldDataType(fieldName); + TypeInfo hiveType = + HiveTypeUtil.toHiveTypeInfo( + dataType.orElseThrow( + () -> + new SemanticException( + String.format( + "Can't get data type for column %s of table %s.", + fieldName, tableName))), false); - colInfo.setSkewedCol(HiveParserUtils.isSkewedCol(tableAlias, qb, colName)); - rowResolver.put(tableAlias, colName, colInfo); - } - - // 3.2 Add column info corresponding to partition columns - for (FieldSchema partCol : table.getPartCols()) { - colName = partCol.getName(); - colInfo = - new ColumnInfo( - colName, - TypeInfoFactory.getPrimitiveTypeInfo(partCol.getType()), - tableAlias, - true); - rowResolver.put(tableAlias, colName, colInfo); + colInfo = new ColumnInfo(fieldName, hiveType, tableAlias, false); + colInfo.setSkewedCol(HiveParserUtils.isSkewedCol(tableAlias, qb, fieldName)); + rowResolver.put(tableAlias, fieldName, colInfo); } - final TableType tableType = obtainTableType(table); - Preconditions.checkArgument( - tableType == TableType.NATIVE, "Only native tables are supported"); + ObjectIdentifier tableIdentifier = + HiveParserBaseSemanticAnalyzer.parseCompoundName(catalogManager, tableName); // Build Hive Table Scan Rel RelNode tableRel = catalogReader .getTable( Arrays.asList( - catalogManager.getCurrentCatalog(), - table.getDbName(), - table.getTableName())) + tableIdentifier.getCatalogName(), + tableIdentifier.getDatabaseName(), + tableIdentifier.getObjectName())) .toRel( ViewExpanders.toRelContext( flinkPlanner.createToRelContext(), cluster)); @@ -1029,7 +1022,7 @@ private boolean genSubQueryRelNode( HiveParserBaseSemanticAnalyzer.Phase1Ctx ctx1 = initPhase1Ctx(); semanticAnalyzer.doPhase1( (HiveParserASTNode) next.getChild(1), subQB, ctx1, null); - semanticAnalyzer.getMetaData(subQB); + semanticAnalyzer.getMetaData(subQB, false); RelNode subQueryRelNode = genLogicalPlan( subQB, diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserDMLHelper.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserDMLHelper.java index 280742dc1fa40..fa886195821da 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserDMLHelper.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserDMLHelper.java @@ -18,24 +18,28 @@ package org.apache.flink.table.planner.delegation.hive; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.tuple.Tuple4; import org.apache.flink.table.api.Schema; +import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.catalog.CatalogManager; +import org.apache.flink.table.catalog.CatalogPartitionSpec; import org.apache.flink.table.catalog.CatalogPropertiesUtil; import org.apache.flink.table.catalog.CatalogTable; import org.apache.flink.table.catalog.ContextResolvedTable; import org.apache.flink.table.catalog.ObjectIdentifier; import org.apache.flink.table.catalog.ResolvedCatalogTable; import org.apache.flink.table.catalog.ResolvedSchema; -import org.apache.flink.table.catalog.UnresolvedIdentifier; -import org.apache.flink.table.catalog.hive.HiveCatalog; import org.apache.flink.table.catalog.hive.factories.HiveCatalogFactoryOptions; +import org.apache.flink.table.catalog.hive.util.HiveTypeUtil; import org.apache.flink.table.factories.FactoryUtil; import org.apache.flink.table.module.hive.udf.generic.HiveGenericUDFToDecimal; import org.apache.flink.table.operations.Operation; import org.apache.flink.table.operations.QueryOperation; import org.apache.flink.table.operations.SinkModifyOperation; import org.apache.flink.table.planner.delegation.PlannerContext; +import org.apache.flink.table.planner.delegation.hive.copy.HiveParserBaseSemanticAnalyzer; import org.apache.flink.table.planner.delegation.hive.copy.HiveParserDirectoryDesc; import org.apache.flink.table.planner.delegation.hive.copy.HiveParserQB; import org.apache.flink.table.planner.delegation.hive.copy.HiveParserSqlFunctionConverter; @@ -44,6 +48,7 @@ import org.apache.flink.table.planner.operations.PlannerQueryOperation; import org.apache.flink.table.planner.plan.nodes.hive.LogicalDistribution; import org.apache.flink.table.planner.plan.nodes.hive.LogicalScriptTransform; +import org.apache.flink.table.types.DataType; import org.apache.flink.util.Preconditions; import org.apache.calcite.rel.RelCollation; @@ -63,11 +68,8 @@ import org.apache.calcite.rex.RexNode; import org.apache.calcite.sql.SqlFunctionCategory; import org.apache.calcite.sql.SqlOperator; -import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.ql.exec.FunctionInfo; import org.apache.hadoop.hive.ql.exec.FunctionRegistry; -import org.apache.hadoop.hive.ql.metadata.Partition; -import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.parse.QBMetaData; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.udf.SettableUDF; @@ -83,6 +85,7 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -110,7 +113,8 @@ public HiveParserDMLHelper( public Tuple4, Boolean> createInsertOperationInfo( RelNode queryRelNode, - Table destTable, + CatalogTable destTable, + ObjectIdentifier destTableIdentifier, Map staticPartSpec, List destSchema, boolean overwrite) @@ -150,13 +154,25 @@ public HiveParserDMLHelper( RelDataTypeFactory typeFactory = plannerContext.getTypeFactory(); LinkedHashMap targetColToCalcType = new LinkedHashMap<>(); List targetHiveTypes = new ArrayList<>(); - List allCols = new ArrayList<>(destTable.getCols()); - allCols.addAll(destTable.getPartCols()); - for (FieldSchema col : allCols) { - TypeInfo hiveType = TypeInfoUtils.getTypeInfoFromTypeString(col.getType()); + TableSchema tableSchema = + HiveParserUtils.fromUnresolvedSchema(destTable.getUnresolvedSchema()); + String[] fieldNames = tableSchema.getFieldNames(); + for (String fieldName : fieldNames) { + Optional dataType = tableSchema.getFieldDataType(fieldName); + TypeInfo hiveType = + HiveTypeUtil.toHiveTypeInfo( + dataType.orElseThrow( + () -> + new SemanticException( + String.format( + "Can't get data type for column %s of table %s.", + fieldName, + destTableIdentifier + .asSummaryString()))), + false); targetHiveTypes.add(hiveType); targetColToCalcType.put( - col.getName(), HiveParserTypeConverter.convert(hiveType, typeFactory)); + fieldName, HiveParserTypeConverter.convert(hiveType, typeFactory)); } // add static partitions to query source @@ -185,7 +201,7 @@ public HiveParserDMLHelper( oldInput, staticPartSpec, destTable, targetColToCalcType); // we may need to shift the field collations final int numDynmPart = - destTable.getTTable().getPartitionKeys().size() - staticPartSpec.size(); + destTable.getPartitionKeys().size() - staticPartSpec.size(); if (!sort.getCollation().getFieldCollations().isEmpty() && numDynmPart > 0) { sort.replaceInput(0, null); sort = @@ -229,35 +245,39 @@ public HiveParserDMLHelper( funcConverter, false); - // create identifier - List targetTablePath = - Arrays.asList(destTable.getDbName(), destTable.getTableName()); - UnresolvedIdentifier unresolvedIdentifier = UnresolvedIdentifier.of(targetTablePath); - ObjectIdentifier identifier = catalogManager.qualifyIdentifier(unresolvedIdentifier); - return Tuple4.of( - identifier, new PlannerQueryOperation(queryRelNode), staticPartSpec, overwrite); + destTableIdentifier, + new PlannerQueryOperation(queryRelNode), + staticPartSpec, + overwrite); } public Operation createInsertOperation(HiveParserCalcitePlanner analyzer, RelNode queryRelNode) throws SemanticException { HiveParserQB topQB = analyzer.getQB(); - QBMetaData qbMetaData = topQB.getMetaData(); + HiveParserQBMetaData qbMetaData = topQB.getMetaData(); // decide the dest table - Map nameToDestTable = qbMetaData.getNameToDestTable(); - Map nameToDestPart = qbMetaData.getNameToDestPartition(); + Map> nameToDestTable = qbMetaData.getNameToDestTable(); + Map> nameToDestPart = + qbMetaData.getNameToDestPartition(); // for now we only support inserting to a single table in one queryRelNode Preconditions.checkState( nameToDestTable.size() <= 1 && nameToDestPart.size() <= 1, "Only support inserting to 1 table"); - Table destTable; + CatalogTable destTable; String insClauseName; + String tableName; if (!nameToDestTable.isEmpty()) { insClauseName = nameToDestTable.keySet().iterator().next(); - destTable = nameToDestTable.values().iterator().next(); + Tuple2 nameTable = nameToDestTable.values().iterator().next(); + tableName = nameTable.f0; + destTable = nameTable.f1; } else if (!nameToDestPart.isEmpty()) { insClauseName = nameToDestPart.keySet().iterator().next(); - destTable = nameToDestPart.values().iterator().next().getTable(); + Tuple3 nameTable = + nameToDestPart.values().iterator().next(); + tableName = nameTable.f0; + destTable = nameTable.f1; } else { // happens for INSERT DIRECTORY return createInsertIntoDirectoryOperation(topQB, qbMetaData, queryRelNode); @@ -266,17 +286,16 @@ public Operation createInsertOperation(HiveParserCalcitePlanner analyzer, RelNod // decide static partition specs Map staticPartSpec = new LinkedHashMap<>(); if (destTable.isPartitioned()) { - List partCols = - HiveCatalog.getFieldNames(destTable.getTTable().getPartitionKeys()); + List partCols = destTable.getPartitionKeys(); if (!nameToDestPart.isEmpty()) { // static partition - Partition destPart = nameToDestPart.values().iterator().next(); + CatalogPartitionSpec destPart = nameToDestPart.values().iterator().next().f2; Preconditions.checkState( - partCols.size() == destPart.getValues().size(), + partCols.size() == destPart.getPartitionSpec().size(), "Part cols and static spec doesn't match"); - for (int i = 0; i < partCols.size(); i++) { - staticPartSpec.put(partCols.get(i), destPart.getValues().get(i)); + for (String partCol : partCols) { + staticPartSpec.put(partCol, destPart.getPartitionSpec().get(partCol)); } } else { // dynamic partition @@ -297,12 +316,13 @@ public Operation createInsertOperation(HiveParserCalcitePlanner analyzer, RelNod topQB.getParseInfo().getInsertOverwriteTables().keySet().stream() .map(String::toLowerCase) .collect(Collectors.toSet()) - .contains(destTable.getDbName() + "." + destTable.getTableName()); + .contains(tableName); Tuple4, Boolean> insertOperationInfo = createInsertOperationInfo( queryRelNode, destTable, + HiveParserBaseSemanticAnalyzer.parseCompoundName(catalogManager, tableName), staticPartSpec, analyzer.getDestSchemaForClause(insClauseName), overwrite); @@ -316,7 +336,7 @@ public Operation createInsertOperation(HiveParserCalcitePlanner analyzer, RelNod } private SinkModifyOperation createInsertIntoDirectoryOperation( - HiveParserQB topQB, QBMetaData qbMetaData, RelNode queryRelNode) { + HiveParserQB topQB, HiveParserQBMetaData qbMetaData, RelNode queryRelNode) { String dest = topQB.getParseInfo().getClauseNamesForDest().iterator().next(); // get the location for insert into directory String location = qbMetaData.getDestFileForAlias(dest); @@ -390,7 +410,7 @@ private ContextResolvedTable createDummyTableForInsertDirectory( private RelNode replaceDistForStaticParts( LogicalDistribution hiveDist, - Table destTable, + CatalogTable destTable, Map staticPartSpec, Map targetColToType) throws SemanticException { @@ -400,7 +420,7 @@ private RelNode replaceDistForStaticParts( hiveDist.getInput(), staticPartSpec, destTable, targetColToType); hiveDist.replaceInput(0, null); final int toShift = staticPartSpec.size(); - final int numDynmPart = destTable.getTTable().getPartitionKeys().size() - toShift; + final int numDynmPart = destTable.getPartitionKeys().size() - toShift; return LogicalDistribution.create( expandedProject, shiftRelCollation(hiveDist.getCollation(), originInput, toShift, numDynmPart), @@ -648,7 +668,7 @@ private static RelNode addProjectForTypeConversion( private RelNode handleDestSchema( SingleRel queryRelNode, - Table destTable, + CatalogTable destTable, List destSchema, Set staticParts) throws SemanticException { @@ -657,28 +677,38 @@ private RelNode handleDestSchema( } // natural schema should contain regular cols + dynamic cols - List naturalSchema = new ArrayList<>(destTable.getCols()); - if (destTable.isPartitioned()) { - naturalSchema.addAll( - destTable.getTTable().getPartitionKeys().stream() - .filter(f -> !staticParts.contains(f.getName())) - .collect(Collectors.toList())); + TableSchema tableSchema = + HiveParserUtils.fromUnresolvedSchema(destTable.getUnresolvedSchema()); + List naturalSchema = new ArrayList<>(); + for (String fieldName : tableSchema.getFieldNames()) { + // only add no partition cols and dynamic partition cols + if (!staticParts.contains(fieldName)) { + naturalSchema.add(fieldName); + } } // we don't need to do anything if the dest schema is the same as natural schema - if (destSchema.equals(HiveCatalog.getFieldNames(naturalSchema))) { + if (destSchema.equals(naturalSchema)) { return queryRelNode; } // build a list to create a Project on top of original Project // for each col in dest table, if it's in dest schema, store its corresponding index in the // dest schema, otherwise store its type and we'll create NULL for it List updatedIndices = new ArrayList<>(naturalSchema.size()); - for (FieldSchema col : naturalSchema) { - int index = destSchema.indexOf(col.getName()); + for (String col : naturalSchema) { + int index = destSchema.indexOf(col); if (index < 0) { + Optional dataType = tableSchema.getFieldDataType(col); + TypeInfo hiveType = + HiveTypeUtil.toHiveTypeInfo( + dataType.orElseThrow( + () -> + new SemanticException( + String.format( + "Can't get data type for column %s.", + col))), + false); updatedIndices.add( - HiveParserTypeConverter.convert( - TypeInfoUtils.getTypeInfoFromTypeString(col.getType()), - plannerContext.getTypeFactory())); + HiveParserTypeConverter.convert(hiveType, plannerContext.getTypeFactory())); } else { updatedIndices.add(index); } @@ -769,7 +799,7 @@ private List updateDistKeys(List distKeys, List update private RelNode replaceOrAddProjectForStaticPart( RelNode relNode, Map staticPartSpec, - Table destTable, + CatalogTable destTable, Map targetColToType) throws SemanticException { if (relNode instanceof Project) { @@ -790,7 +820,7 @@ private RelNode replaceOrAddProjectForStaticPart( private RelNode replaceProjectForStaticPart( Project project, Map staticPartSpec, - Table destTable, + CatalogTable destTable, Map targetColToType) { List extendedExprs = addExtraRexNodeForStaticPart( @@ -808,7 +838,7 @@ private RelNode replaceProjectForStaticPart( private Project addProjectForStaticPart( LogicalScriptTransform logicalScriptTransform, Map staticPartSpec, - Table destTable, + CatalogTable destTable, Map targetColToType) { RexBuilder rexBuilder = plannerContext.getCluster().getRexBuilder(); List originRexNodes = @@ -828,11 +858,11 @@ private Project addProjectForStaticPart( private List addExtraRexNodeForStaticPart( List originRexNodes, Map staticPartSpec, - Table destTable, + CatalogTable destTable, Map targetColToType) { List extendedRexNodes = new ArrayList<>(originRexNodes); RexBuilder rexBuilder = plannerContext.getCluster().getRexBuilder(); - int numDynmPart = destTable.getTTable().getPartitionKeys().size() - staticPartSpec.size(); + int numDynmPart = destTable.getPartitionKeys().size() - staticPartSpec.size(); int insertIndex = originRexNodes.size() - numDynmPart; for (Map.Entry spec : staticPartSpec.entrySet()) { RexNode toAdd = diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserQBMetaData.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserQBMetaData.java new file mode 100644 index 0000000000000..1bde577c07523 --- /dev/null +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserQBMetaData.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.delegation.hive; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.table.catalog.CatalogPartitionSpec; +import org.apache.flink.table.catalog.CatalogTable; + +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.Map; + +/** Counterpart of hive's org.apache.hadoop.hive.ql.parse.QBMetaData. */ +public class HiveParserQBMetaData { + public static final int DEST_TABLE = 1; + // alias -> + private final Map> aliasToTable = new LinkedHashMap<>(); + private final Map> nameToDestTable = new HashMap<>(); + private final Map> + nameToDestPartition = new HashMap<>(); + private final Map nameToDestFile = new HashMap<>(); + private final Map nameToDestType = new HashMap<>(); + private final Map> aliasToPartSpec = new LinkedHashMap<>(); + + public HiveParserQBMetaData() {} + + public void setSrcForAlias(String alias, String tabName, CatalogTable tab) { + this.aliasToTable.put(alias, Tuple2.of(tabName, tab)); + } + + public void setDestForAlias(String alias, String tabName, CatalogTable tab) { + this.nameToDestType.put(alias, 1); + this.nameToDestTable.put(alias, Tuple2.of(tabName, tab)); + } + + public void setDestForAlias( + String alias, String tabName, CatalogTable tab, CatalogPartitionSpec part) { + this.nameToDestType.put(alias, 2); + this.nameToDestPartition.put(alias, Tuple3.of(tabName, tab, part)); + } + + public void setDestForAlias(String alias, String fname, boolean isDfsFile) { + this.nameToDestType.put(alias, isDfsFile ? 3 : 5); + this.nameToDestFile.put(alias, fname); + } + + public Map> getNameToDestTable() { + return this.nameToDestTable; + } + + public String getDestFileForAlias(String alias) { + return nameToDestFile.get(alias.toLowerCase()); + } + + public CatalogPartitionSpec getDestPartitionForAlias(String alias) { + return this.nameToDestPartition.get(alias.toLowerCase()).f2; + } + + public Map> + getNameToDestPartition() { + return this.nameToDestPartition; + } + + public Tuple2 getSrcForAlias(String alias) { + return this.aliasToTable.get(alias.toLowerCase()); + } + + public Map getPartSpecForAlias(String alias) { + return this.aliasToPartSpec.get(alias); + } + + public void setPartSpecForAlias(String alias, Map partSpec) { + this.aliasToPartSpec.put(alias, partSpec); + } + + public Integer getDestTypeForAlias(String alias) { + return nameToDestType.get(alias.toLowerCase()); + } +} diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserUtils.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserUtils.java index 3356410114dfe..04757e2d58948 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserUtils.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserUtils.java @@ -19,12 +19,17 @@ package org.apache.flink.table.planner.delegation.hive; import org.apache.flink.connectors.hive.FlinkHiveException; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.api.TableColumn; +import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper; import org.apache.flink.table.catalog.hive.client.HiveShim; import org.apache.flink.table.catalog.hive.client.HiveShimLoader; import org.apache.flink.table.catalog.hive.factories.HiveCatalogFactoryOptions; import org.apache.flink.table.catalog.hive.util.HiveReflectionUtils; import org.apache.flink.table.catalog.hive.util.HiveTypeUtil; +import org.apache.flink.table.expressions.SqlCallExpression; import org.apache.flink.table.functions.FunctionKind; import org.apache.flink.table.functions.hive.HiveGenericUDAF; import org.apache.flink.table.module.hive.udf.generic.GenericUDFLegacyGroupingID; @@ -1689,4 +1694,61 @@ public static boolean isFromTimeStampToDecimal(RelDataType srcType, RelDataType return srcType.getSqlTypeName().equals(SqlTypeName.TIMESTAMP) && targetType.getSqlTypeName().equals(SqlTypeName.DECIMAL); } + + /** + * Helps to migrate the new {@link Schema} to old API methods. HiveCatalog use deprecated {@link + * TableSchema}, other catalogs may use the new {@link Schema}. Currently, we use it to unify to + * {@link TableSchema}. It should be dropped after dropping {@link TableSchema}. + */ + public static TableSchema fromUnresolvedSchema(Schema schema) { + final TableSchema.Builder builder = TableSchema.builder(); + + final DataType unresolvedType = DataTypes.TIMESTAMP(3); + schema.getColumns().stream() + .map( + column -> { + if (column instanceof Schema.UnresolvedPhysicalColumn) { + final Schema.UnresolvedPhysicalColumn c = + (Schema.UnresolvedPhysicalColumn) column; + return TableColumn.physical( + c.getName(), (DataType) c.getDataType()); + } else if (column instanceof Schema.UnresolvedMetadataColumn) { + final Schema.UnresolvedMetadataColumn c = + (Schema.UnresolvedMetadataColumn) column; + return TableColumn.metadata( + c.getName(), + (DataType) c.getDataType(), + c.getMetadataKey(), + c.isVirtual()); + } else if (column instanceof Schema.UnresolvedComputedColumn) { + final Schema.UnresolvedComputedColumn c = + (Schema.UnresolvedComputedColumn) column; + return TableColumn.computed( + c.getName(), + unresolvedType, + ((SqlCallExpression) c.getExpression()).getSqlExpression()); + } + throw new IllegalArgumentException( + "Unsupported column type: " + column); + }) + .forEach(builder::add); + + schema.getWatermarkSpecs() + .forEach( + spec -> + builder.watermark( + spec.getColumnName(), + ((SqlCallExpression) spec.getWatermarkExpression()) + .getSqlExpression(), + unresolvedType)); + + schema.getPrimaryKey() + .ifPresent( + pk -> + builder.primaryKey( + pk.getConstraintName(), + pk.getColumnNames().toArray(new String[0]))); + + return builder.build(); + } } diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/copy/HiveParserBaseSemanticAnalyzer.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/copy/HiveParserBaseSemanticAnalyzer.java index b502bc0143e75..b8694fdbb6dd1 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/copy/HiveParserBaseSemanticAnalyzer.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/copy/HiveParserBaseSemanticAnalyzer.java @@ -19,6 +19,13 @@ package org.apache.flink.table.planner.delegation.hive.copy; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.table.catalog.CatalogBaseTable; +import org.apache.flink.table.catalog.CatalogManager; +import org.apache.flink.table.catalog.CatalogPartitionSpec; +import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.table.catalog.UnresolvedIdentifier; +import org.apache.flink.table.catalog.hive.util.HiveTypeUtil; import org.apache.flink.table.planner.delegation.hive.HiveParserConstants; import org.apache.flink.table.planner.delegation.hive.HiveParserRexNodeConverter; import org.apache.flink.table.planner.delegation.hive.HiveParserTypeCheckProcFactory; @@ -32,6 +39,7 @@ import org.apache.flink.table.planner.delegation.hive.parse.HiveASTParser; import org.apache.flink.table.planner.delegation.hive.parse.HiveParserDDLSemanticAnalyzer; import org.apache.flink.table.planner.delegation.hive.parse.HiveParserErrorMsg; +import org.apache.flink.table.types.DataType; import org.apache.flink.util.Preconditions; import org.antlr.runtime.tree.Tree; @@ -63,7 +71,6 @@ import org.apache.calcite.sql.type.SqlTypeUtil; import org.apache.calcite.tools.FrameworkConfig; import org.apache.calcite.util.ImmutableBitSet; -import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.hive.common.ObjectPair; import org.apache.hadoop.hive.conf.HiveConf; @@ -73,10 +80,6 @@ import org.apache.hadoop.hive.ql.exec.FunctionRegistry; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.lib.Node; -import org.apache.hadoop.hive.ql.metadata.Hive; -import org.apache.hadoop.hive.ql.metadata.HiveException; -import org.apache.hadoop.hive.ql.metadata.InvalidTableException; -import org.apache.hadoop.hive.ql.metadata.Partition; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.metadata.VirtualColumn; import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveFilter; @@ -103,6 +106,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nullable; + import java.io.Serializable; import java.io.UnsupportedEncodingException; import java.math.BigDecimal; @@ -117,9 +122,12 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Objects; +import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; import java.util.stream.IntStream; +import java.util.stream.Stream; import static org.apache.flink.table.planner.delegation.hive.HiveParserUtils.removeASTChild; import static org.apache.flink.table.planner.delegation.hive.parse.HiveParserDDLSemanticAnalyzer.encodeRowFormat; @@ -218,7 +226,6 @@ public static List getColumns( int numCh = ast.getChildCount(); List pkInfos = new ArrayList<>(); Map nametoFS = new HashMap<>(); - Tree parent = ast.getParent(); for (int i = 0; i < numCh; i++) { FieldSchema col = new FieldSchema(); @@ -255,16 +262,9 @@ public static List getColumns( constraintChild = (HiveParserASTNode) child.getChild(2); } if (constraintChild != null) { - String[] qualifiedTabName = - getQualifiedTableName((HiveParserASTNode) parent.getChild(0)); switch (constraintChild.getToken().getType()) { case HiveASTParser.TOK_NOT_NULL: - notNulls.add( - processNotNull( - constraintChild, - qualifiedTabName[0], - qualifiedTabName[1], - col.getName())); + notNulls.add(processNotNull(constraintChild, col.getName())); break; default: throw new SemanticException( @@ -277,13 +277,12 @@ public static List getColumns( } } if (!pkInfos.isEmpty()) { - processPrimaryKeys((HiveParserASTNode) parent, pkInfos, primaryKeys, nametoFS); + processPrimaryKeys(pkInfos, primaryKeys, nametoFS); } return colList; } - private static NotNullConstraint processNotNull( - HiveParserASTNode nnNode, String dbName, String tblName, String colName) + private static NotNullConstraint processNotNull(HiveParserASTNode nnNode, String colName) throws SemanticException { boolean enable = true; boolean validate = false; @@ -309,30 +308,17 @@ private static NotNullConstraint processNotNull( "Unexpected node for NOT NULL constraint: " + child); } } - return new NotNullConstraint(dbName, tblName, colName, null, enable, validate, rely); + return new NotNullConstraint(colName, null, enable, validate, rely); } private static void processPrimaryKeys( - HiveParserASTNode parent, - List pkInfos, - List primaryKeys, - Map nametoFS) + List pkInfos, List primaryKeys, Map nametoFS) throws SemanticException { - int cnt = 1; - String[] qualifiedTabName = getQualifiedTableName((HiveParserASTNode) parent.getChild(0)); - for (PKInfo pkInfo : pkInfos) { String pk = pkInfo.colName; if (nametoFS.containsKey(pk)) { PrimaryKey currPrimaryKey = - new PrimaryKey( - qualifiedTabName[0], - qualifiedTabName[1], - pk, - pkInfo.constraintName, - false, - false, - pkInfo.rely); + new PrimaryKey(pk, pkInfo.constraintName, false, false, pkInfo.rely); primaryKeys.add(currPrimaryKey); } else { throw new SemanticException(ErrorMsg.INVALID_COLUMN.getMsg(pk)); @@ -403,12 +389,8 @@ private static void checkColumnName(String columnName) throws SemanticException } } - public static String getDotName(String[] qname) throws SemanticException { - String genericName = StringUtils.join(qname, "."); - if (qname.length != 2) { - throw new SemanticException(ErrorMsg.INVALID_TABLE_NAME, genericName); - } - return genericName; + public static String getDotName(String... names) { + return Stream.of(names).filter(Objects::nonNull).collect(Collectors.joining(".")); } /** @@ -429,20 +411,44 @@ public static void readProps(HiveParserASTNode prop, Map mapProp } } - public static String[] getQualifiedTableName(HiveParserASTNode tabNameNode) + public static ObjectIdentifier getObjectIdentifier( + CatalogManager catalogManager, HiveParserASTNode tabNameNode) throws SemanticException { + UnresolvedIdentifier qualifiedTableName = getQualifiedTableName(tabNameNode); + return catalogManager.qualifyIdentifier(qualifiedTableName); + } + + public static ObjectIdentifier parseCompoundName( + CatalogManager catalogManager, String compoundName) { + String[] names = compoundName.split("\\."); + return catalogManager.qualifyIdentifier(UnresolvedIdentifier.of(names)); + } + + public static UnresolvedIdentifier getQualifiedTableName(HiveParserASTNode tabNameNode) throws SemanticException { - if (tabNameNode.getType() != HiveASTParser.TOK_TABNAME - || (tabNameNode.getChildCount() != 1 && tabNameNode.getChildCount() != 2)) { + if (tabNameNode.getType() != HiveASTParser.TOK_TABNAME) { throw new SemanticException( HiveParserErrorMsg.getMsg(ErrorMsg.INVALID_TABLE_NAME, tabNameNode)); } - if (tabNameNode.getChildCount() == 2) { - String dbName = unescapeIdentifier(tabNameNode.getChild(0).getText()); - String tableName = unescapeIdentifier(tabNameNode.getChild(1).getText()); - return new String[] {dbName, tableName}; + String catalogName; + String dbName; + String tableName; + switch (tabNameNode.getChildCount()) { + case 1: + tableName = unescapeIdentifier(tabNameNode.getChild(0).getText()); + return UnresolvedIdentifier.of(tableName); + case 2: + dbName = unescapeIdentifier(tabNameNode.getChild(0).getText()); + tableName = unescapeIdentifier(tabNameNode.getChild(1).getText()); + return UnresolvedIdentifier.of(dbName, tableName); + case 3: + catalogName = unescapeIdentifier(tabNameNode.getChild(0).getText()); + dbName = unescapeIdentifier(tabNameNode.getChild(1).getText()); + tableName = unescapeIdentifier(tabNameNode.getChild(2).getText()); + return UnresolvedIdentifier.of(catalogName, dbName, tableName); + default: + throw new SemanticException( + HiveParserErrorMsg.getMsg(ErrorMsg.INVALID_TABLE_NAME, tabNameNode)); } - String tableName = unescapeIdentifier(tabNameNode.getChild(0).getText()); - return Utilities.getDbTableName(tableName); } public static Tuple2 charSetString(String charSetName, String charSetString) @@ -499,19 +505,18 @@ public static String unescapeIdentifier(String val) { /** * Get the unqualified name from a table node. This method works for table names qualified with - * their schema (e.g., "db.table") and table names without schema qualification. In both cases, - * it returns the table name without the schema. + * their schema (e.g., "catalog.db.table") and table names without schema qualification. In both + * cases, it returns the table name without the schema. * * @param node the table node - * @return the table name without schema qualification (i.e., if name is "db.table" or "table", - * returns "table") + * @return the table name without schema qualification (i.e., if name is "catalog.db.table" or + * "table", returns "table") */ - public static String getUnescapedUnqualifiedTableName(HiveParserASTNode node) { - assert node.getChildCount() <= 2; + public static String getUnescapedUnqualifiedTableName(HiveParserASTNode node) + throws SemanticException { + assert node.getChildCount() <= 3; - if (node.getChildCount() == 2) { - node = (HiveParserASTNode) node.getChild(1); - } + node = (HiveParserASTNode) node.getChild(node.getChildCount() - 1); return getUnescapedName(node); } @@ -520,10 +525,48 @@ public static String getUnescapedUnqualifiedTableName(HiveParserASTNode node) { * Get dequoted name from a table/column node. * * @param tableOrColumnNode the table or column node - * @return for table node, db.tab or tab. for column node column. + * @return for table node, return the table that users specific like catalog.db.tab, db.tab or + * tab. For column node column, return col. + */ + public static String getUnescapedName(HiveParserASTNode tableOrColumnNode) + throws SemanticException { + return getUnescapedName(tableOrColumnNode, null, null); + } + + public static String getUnescapedName( + HiveParserASTNode tableOrColumnNode, + @Nullable String currentCatalog, + @Nullable String currentDatabase) + throws SemanticException { + int tokenType = tableOrColumnNode.getToken().getType(); + if (tokenType == HiveASTParser.TOK_TABNAME) { + // table node + UnresolvedIdentifier tableIdentifier = getQualifiedTableName(tableOrColumnNode); + return getDotName( + tableIdentifier.getCatalogName().orElse(currentCatalog), + tableIdentifier.getDatabaseName().orElse(currentDatabase), + tableIdentifier.getObjectName()); + } else if (tokenType == HiveASTParser.StringLiteral) { + return unescapeSQLString(tableOrColumnNode.getText()); + } + // column node + return unescapeIdentifier(tableOrColumnNode.getText()); + } + + /** + * Get the unescaped origin table name for the table node. This method returns + * "catalog.db.table","db.table" or "table" according to what the table node actually specifies + * + * @param node the table node + * @return "catalog.db.table", "db.table" or "table" */ - public static String getUnescapedName(HiveParserASTNode tableOrColumnNode) { - return getUnescapedName(tableOrColumnNode, null); + public static String getUnescapedOriginTableName(HiveParserASTNode node) + throws SemanticException { + UnresolvedIdentifier tableIdentifier = getQualifiedTableName(node); + return getDotName( + tableIdentifier.getCatalogName().orElse(null), + tableIdentifier.getDatabaseName().orElse(null), + tableIdentifier.getObjectName()); } public static String getUnescapedName( @@ -662,19 +705,6 @@ public static String unescapeSQLString(String b) { return sb.toString(); } - public static void validatePartSpec( - Table tbl, - Map partSpec, - HiveParserASTNode astNode, - HiveConf conf, - boolean shouldBeFull, - FrameworkConfig frameworkConfig, - RelOptCluster cluster) - throws SemanticException { - tbl.validatePartColumnNames(partSpec, shouldBeFull); - validatePartColumnType(tbl, partSpec, astNode, conf, frameworkConfig, cluster); - } - private static boolean getPartExprNodeDesc( HiveParserASTNode astNode, HiveConf conf, @@ -743,7 +773,8 @@ static List doPhase1GetDistinctFuncExprs( return exprs; } - static String findSimpleTableName(HiveParserASTNode tabref, int aliasIndex) { + static String findSimpleTableName(HiveParserASTNode tabref, int aliasIndex) + throws SemanticException { assert tabref.getType() == HiveASTParser.TOK_TABREF; HiveParserASTNode tableTree = (HiveParserASTNode) (tabref.getChild(0)); @@ -1770,14 +1801,15 @@ public static AggInfo getHiveAggInfo( public static RelNode genValues( String tabAlias, - Table tmpTable, + CatalogTable catalogTable, HiveParserRowResolver rowResolver, RelOptCluster cluster, List> values) { - List tmpTableTypes = - tmpTable.getCols().stream() - .map(f -> TypeInfoUtils.getTypeInfoFromTypeString(f.getType())) - .collect(Collectors.toList()); + List tmpTableTypes = new ArrayList<>(); + DataType[] dataTypes = catalogTable.getSchema().getFieldDataTypes(); + for (DataType dataType : dataTypes) { + tmpTableTypes.add(HiveTypeUtil.toHiveTypeInfo(dataType, false)); + } RexBuilder rexBuilder = cluster.getRexBuilder(); // calcite types for each field @@ -1935,8 +1967,8 @@ private static Set findCorrelatedVar(RexNode node) { return allVars; } - private static void validatePartColumnType( - Table tbl, + public static void validatePartColumnType( + CatalogTable catalogTable, Map partSpec, HiveParserASTNode astNode, HiveConf conf, @@ -1959,10 +1991,22 @@ private static void validatePartColumnType( return; // All columns are dynamic, nothing to do. } - List parts = tbl.getPartitionKeys(); - Map partCols = new HashMap<>(parts.size()); - for (FieldSchema col : parts) { - partCols.put(col.getName(), col.getType().toLowerCase()); + List parts = catalogTable.getPartitionKeys(); + Map partColsTypes = new HashMap<>(parts.size()); + for (String col : parts) { + Optional dataType = + HiveParserUtils.fromUnresolvedSchema(catalogTable.getUnresolvedSchema()) + .getFieldDataType(col); + TypeInfo hiveType = + HiveTypeUtil.toHiveTypeInfo( + dataType.orElseThrow( + () -> + new SemanticException( + String.format( + "Can't get data type for column %s.", + col))), + false); + partColsTypes.put(col, hiveType); } for (Map.Entry astExprNodePair : astExprNodeMap.entrySet()) { @@ -1970,12 +2014,11 @@ private static void validatePartColumnType( if (astExprNodePair.getKey().getType() == HiveASTParser.Identifier) { astKeyName = stripIdentifierQuotes(astKeyName); } - String colType = partCols.get(astKeyName); + + TypeInfo expectedType = partColsTypes.get(astKeyName); ObjectInspector inputOI = TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo( astExprNodePair.getValue().getTypeInfo()); - - TypeInfo expectedType = TypeInfoUtils.getTypeInfoFromTypeString(colType); ObjectInspector outputOI = TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo(expectedType); // Since partVal is a constant, it is safe to cast ExprNodeDesc to @@ -2006,9 +2049,9 @@ private static void validatePartColumnType( + " but input value is in type " + inputOI.getTypeName() + ". Convert " - + value.toString() + + value + " to " - + convertedValue.toString()); + + convertedValue); } } @@ -2021,17 +2064,17 @@ private static void validatePartColumnType( + " has been changed to " + astKeyName + "=" - + convertedValue.toString()); + + convertedValue); } partSpec.put(astKeyName, convertedValue.toString()); } } - private static void errorPartSpec(Map partSpec, List parts) + private static void errorPartSpec(Map partSpec, List parts) throws SemanticException { StringBuilder sb = new StringBuilder("Partition columns in the table schema are: ("); - for (FieldSchema fs : parts) { - sb.append(fs.getName()).append(", "); + for (String part : parts) { + sb.append(part).append(", "); } sb.setLength(sb.length() - 2); // remove the last ", " sb.append("), while the partitions specified in the query are: ("); @@ -2045,15 +2088,27 @@ private static void errorPartSpec(Map partSpec, List + new IllegalArgumentException( + String.format( + "Table %s doesn't exist.", + tableIdentifier.asSummaryString()))) + .getTable(); + } + /** Counterpart of hive's BaseSemanticAnalyzer.TableSpec. */ public static class TableSpec { + public ObjectIdentifier tableIdentifier; public String tableName; - public Table tableHandle; - public Map partSpec; // has to use LinkedHashMap to enforce order - public Partition partHandle; + public CatalogBaseTable table; + public Map partSpec = new HashMap<>(); + public CatalogPartitionSpec partHandle; public int numDynParts; // number of dynamic partition columns - public List - partitions; // involved partitions in TableScanOperator/FileSinkOperator /** SpecType. */ public enum SpecType { @@ -2065,21 +2120,9 @@ public enum SpecType { public TableSpec.SpecType specType; public TableSpec( - Hive db, - HiveConf conf, - HiveParserASTNode ast, - FrameworkConfig frameworkConfig, - RelOptCluster cluster) - throws SemanticException { - this(db, conf, ast, true, false, frameworkConfig, cluster); - } - - public TableSpec( - Hive db, + CatalogManager catalogManager, HiveConf conf, HiveParserASTNode ast, - boolean allowDynamicPartitionsSpec, - boolean allowPartialPartitionsSpec, FrameworkConfig frameworkConfig, RelOptCluster cluster) throws SemanticException { @@ -2091,22 +2134,12 @@ public TableSpec( int childIndex = 0; numDynParts = 0; - try { - // get table metadata - tableName = getUnescapedName((HiveParserASTNode) ast.getChild(0)); - boolean testMode = conf.getBoolVar(HiveConf.ConfVars.HIVETESTMODE); - if (testMode) { - tableName = conf.getVar(HiveConf.ConfVars.HIVETESTMODEPREFIX) + tableName; - } - if (ast.getToken().getType() != HiveASTParser.TOK_CREATETABLE - && ast.getToken().getType() != HiveASTParser.TOK_CREATE_MATERIALIZED_VIEW) { - tableHandle = db.getTable(tableName); - } - } catch (InvalidTableException ite) { - throw new SemanticException( - HiveParserErrorMsg.getMsg(ErrorMsg.INVALID_TABLE, ast.getChild(0)), ite); - } catch (HiveException e) { - throw new SemanticException("Error while retrieving table metadata", e); + // get table metadata + tableIdentifier = + getObjectIdentifier(catalogManager, (HiveParserASTNode) ast.getChild(0)); + if (ast.getToken().getType() != HiveASTParser.TOK_CREATETABLE + && ast.getToken().getType() != HiveASTParser.TOK_CREATE_MATERIALIZED_VIEW) { + table = getCatalogBaseTable(catalogManager, tableIdentifier); } // get partition metadata if partition specified @@ -2115,7 +2148,6 @@ public TableSpec( && ast.getToken().getType() != HiveASTParser.TOK_CREATE_MATERIALIZED_VIEW) { childIndex = 1; HiveParserASTNode partspec = (HiveParserASTNode) ast.getChild(1); - partitions = new ArrayList(); // partSpec is a mapping from partition column name to its value. Map tmpPartSpec = new HashMap<>(partspec.getChildCount()); for (int i = 0; i < partspec.getChildCount(); ++i) { @@ -2124,28 +2156,27 @@ public TableSpec( String colName = unescapeIdentifier(partspecVal.getChild(0).getText().toLowerCase()); if (partspecVal.getChildCount() < 2) { // DP in the form of T partition (ds, hr) - if (allowDynamicPartitionsSpec) { - ++numDynParts; - } else { - throw new SemanticException( - ErrorMsg.INVALID_PARTITION.getMsg( - " - Dynamic partitions not allowed")); - } + ++numDynParts; } else { // in the form of T partition (ds="2010-03-03") val = stripQuotes(partspecVal.getChild(1).getText()); } tmpPartSpec.put(colName, val); } - // check if the columns, as well as value types in the partition() clause are valid - validatePartSpec( - tableHandle, tmpPartSpec, ast, conf, false, frameworkConfig, cluster); + if (!(table instanceof CatalogTable)) { + throw new IllegalArgumentException( + tableIdentifier.asSummaryString() + + " is not a table, partition is only allowed for table."); + } + + // check if the columns value type in the partition() clause are valid + validatePartColumnType( + (CatalogTable) table, tmpPartSpec, ast, conf, frameworkConfig, cluster); - List parts = tableHandle.getPartitionKeys(); - partSpec = new LinkedHashMap(partspec.getChildCount()); - for (FieldSchema fs : parts) { - String partKey = fs.getName(); - partSpec.put(partKey, tmpPartSpec.get(partKey)); + List parts = ((CatalogTable) table).getPartitionKeys(); + partSpec = new LinkedHashMap<>(partspec.getChildCount()); + for (String part : parts) { + partSpec.put(part, tmpPartSpec.get(part)); } // check if the partition spec is valid @@ -2163,15 +2194,15 @@ public TableSpec( errorPartSpec(partSpec, parts); } Iterator itrPsKeys = partSpec.keySet().iterator(); - for (FieldSchema fs : parts) { - if (!itrPsKeys.next().toLowerCase().equals(fs.getName().toLowerCase())) { + for (String part : parts) { + if (!itrPsKeys.next().equalsIgnoreCase(part)) { errorPartSpec(partSpec, parts); } } // check if static partition appear after dynamic partitions - for (FieldSchema fs : parts) { - if (partSpec.get(fs.getName().toLowerCase()) == null) { + for (String part : parts) { + if (partSpec.get(part.toLowerCase()) == null) { if (numStaPart > 0) { // found a DP, but there exists ST as subpartition throw new SemanticException( HiveParserErrorMsg.getMsg( @@ -2186,26 +2217,7 @@ public TableSpec( partHandle = null; specType = TableSpec.SpecType.DYNAMIC_PARTITION; } else { - try { - if (allowPartialPartitionsSpec) { - partitions = db.getPartitions(tableHandle, partSpec); - } else { - // this doesn't create partition. - partHandle = db.getPartition(tableHandle, partSpec, false); - if (partHandle == null) { - // if partSpec doesn't exists in DB, return a delegate one - // and the actual partition is created in MoveTask - partHandle = new Partition(tableHandle, partSpec, null); - } else { - partitions.add(partHandle); - } - } - } catch (HiveException e) { - throw new SemanticException( - HiveParserErrorMsg.getMsg( - ErrorMsg.INVALID_PARTITION, ast.getChild(childIndex)), - e); - } + partHandle = new CatalogPartitionSpec(partSpec); specType = TableSpec.SpecType.STATIC_PARTITION; } } else { @@ -2226,7 +2238,11 @@ public String toString() { if (partHandle != null) { return partHandle.toString(); } else { - return tableHandle.toString(); + return String.format( + "Table kind: %s, table schema: %s, table options: %s", + table.getTableKind(), + HiveParserUtils.fromUnresolvedSchema(table.getUnresolvedSchema()), + table.getOptions()); } } } @@ -2281,12 +2297,6 @@ private static class PKInfo { public PKInfo(String colName) { this.colName = colName; } - - public PKInfo(String colName, String constraintName, boolean rely) { - this.colName = colName; - this.constraintName = constraintName; - this.rely = rely; - } } /** Counterpart of hive's SemanticAnalyzer.CTEClause. */ @@ -2453,10 +2463,8 @@ public void analyzeSerdeProps(HiveParserASTNode child) throws SemanticException /** Counterpart of hive's SQLPrimaryKey. */ public static class PrimaryKey implements Serializable { - private static final long serialVersionUID = 3036210046732750293L; + private static final long serialVersionUID = 1L; - private final String dbName; - private final String tblName; private final String pk; private final String constraintName; private final boolean enable; @@ -2464,15 +2472,7 @@ public static class PrimaryKey implements Serializable { private final boolean rely; public PrimaryKey( - String dbName, - String tblName, - String pk, - String constraintName, - boolean enable, - boolean validate, - boolean rely) { - this.dbName = dbName; - this.tblName = tblName; + String pk, String constraintName, boolean enable, boolean validate, boolean rely) { this.pk = pk; this.constraintName = constraintName; this.enable = enable; @@ -2480,14 +2480,6 @@ public PrimaryKey( this.rely = rely; } - public String getDbName() { - return dbName; - } - - public String getTblName() { - return tblName; - } - public String getPk() { return pk; } @@ -2512,10 +2504,8 @@ public boolean isRely() { /** Counterpart of hive's SQLNotNullConstraint. */ public static class NotNullConstraint implements Serializable { - private static final long serialVersionUID = 7642343368203203950L; + private static final long serialVersionUID = 1L; - private final String dbName; - private final String tblName; private final String colName; private final String constraintName; private final boolean enable; @@ -2523,15 +2513,11 @@ public static class NotNullConstraint implements Serializable { private final boolean rely; public NotNullConstraint( - String dbName, - String tblName, String colName, String constraintName, boolean enable, boolean validate, boolean rely) { - this.dbName = dbName; - this.tblName = tblName; this.colName = colName; this.constraintName = constraintName; this.enable = enable; @@ -2539,14 +2525,6 @@ public NotNullConstraint( this.rely = rely; } - public String getDbName() { - return dbName; - } - - public String getTblName() { - return tblName; - } - public String getColName() { return colName; } diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/copy/HiveParserQB.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/copy/HiveParserQB.java index bdccd09b7f0c7..ca52eb984b12b 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/copy/HiveParserQB.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/copy/HiveParserQB.java @@ -18,13 +18,17 @@ package org.apache.flink.table.planner.delegation.hive.copy; -import org.apache.hadoop.hive.ql.metadata.Table; -import org.apache.hadoop.hive.ql.parse.QBMetaData; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.catalog.CatalogView; +import org.apache.flink.table.planner.delegation.hive.HiveParserQBMetaData; + import org.apache.hadoop.hive.ql.plan.CreateTableDesc; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.LinkedHashMap; @@ -39,18 +43,21 @@ public class HiveParserQB { private int numSels = 0; private int numSelDi = 0; + private final Map aliasToTabsOriginName; private final HashMap aliasToTabs; private final HashMap aliasToSubq; - private final HashMap viewAliasToViewSchema; + private final HashMap viewAliasToViewSchema; private final HashMap> aliasToProps; private final List aliases; private final HiveParserQBParseInfo qbp; - private final QBMetaData qbm; + private final HiveParserQBMetaData qbm; private final String id; private boolean isQuery; private boolean insideView; private Set aliasInsideView; - private final Map>> valuesTableToData = new HashMap<>(); + // tableName -> + private final Map>>> valuesTableToData = + new HashMap<>(); // used by PTFs /* @@ -85,6 +92,7 @@ public void print(String msg) { public HiveParserQB(String outerId, String alias, boolean isSubQ) { // Must be deterministic order maps - see HIVE-8707 + aliasToTabsOriginName = new LinkedHashMap<>(); aliasToTabs = new LinkedHashMap<>(); aliasToSubq = new LinkedHashMap<>(); viewAliasToViewSchema = new LinkedHashMap<>(); @@ -94,7 +102,7 @@ public HiveParserQB(String outerId, String alias, boolean isSubQ) { alias = alias.toLowerCase(); } qbp = new HiveParserQBParseInfo(alias, isSubQ); - qbm = new QBMetaData(); + qbm = new HiveParserQBMetaData(); // Must be deterministic order maps - see HIVE-8707 ptfNodeToSpec = new LinkedHashMap<>(); destToWindowingSpec = new LinkedHashMap<>(); @@ -119,7 +127,7 @@ public HiveParserQBParseInfo getParseInfo() { return qbp; } - public QBMetaData getMetaData() { + public HiveParserQBMetaData getMetaData() { return qbm; } @@ -136,8 +144,17 @@ public boolean exists(String alias) { return aliasToTabs.get(alias) != null || aliasToSubq.get(alias) != null; } - public void setTabAlias(String alias, String tabName) { - aliasToTabs.put(alias.toLowerCase(), tabName); + /** + * Maintain table alias -> (originTableName, qualifiedName). + * + * @param alias table alias + * @param originTableName table name that be actually specified, may be "table", "db.table", + * "catalog.db.table" + * @param qualifiedName table name with full path, always is "catalog.db.table" + */ + public void setTabAlias(String alias, String originTableName, String qualifiedName) { + aliasToTabsOriginName.put(alias.toLowerCase(), originTableName.toLowerCase()); + aliasToTabs.put(alias.toLowerCase(), qualifiedName); } public void setSubqAlias(String alias, HiveParserQBExpr qbexpr) { @@ -182,14 +199,19 @@ public String getTabNameForAlias(String alias) { return aliasToTabs.get(alias.toLowerCase()); } + public String getOriginTabNameForAlias(String alias) { + return aliasToTabsOriginName.get(alias.toLowerCase()); + } + public void rewriteViewToSubq( - String alias, String viewName, HiveParserQBExpr qbexpr, Table tab) { + String alias, String viewName, HiveParserQBExpr qbexpr, CatalogView view) { alias = alias.toLowerCase(); String tableName = aliasToTabs.remove(alias); - assert (viewName.equals(tableName)); + String originTableName = aliasToTabsOriginName.remove(alias); + assert (viewName.equals(tableName) || viewName.equals(originTableName)); aliasToSubq.put(alias, qbexpr); - if (tab != null) { - viewAliasToViewSchema.put(alias, tab); + if (view != null) { + viewAliasToViewSchema.put(alias, view); } } @@ -240,12 +262,8 @@ public boolean isCTAS() { /** Retrieve skewed column name for a table. */ public List getSkewedColumnNames(String alias) { - List skewedColNames = null; - if (null != qbm && null != qbm.getAliasToTable() && qbm.getAliasToTable().size() > 0) { - Table tbl = getMetaData().getTableForAlias(alias); - skewedColNames = tbl.getSkewedColNames(); - } - return skewedColNames; + // currently, skew column means nothing for flink, so we just return an empty list. + return Collections.emptyList(); } public HiveParserPTFInvocationSpec getPTFInvocationSpec(HiveParserASTNode node) { @@ -278,7 +296,7 @@ public int incrNumSubQueryPredicates() { return ++numSubQueryPredicates; } - public HashMap getViewToTabSchema() { + public HashMap getViewToTabSchema() { return viewAliasToViewSchema; } @@ -313,7 +331,7 @@ public boolean isMaterializedView() { return false; } - public Map>> getValuesTableToData() { + public Map>>> getValuesTableToData() { return valuesTableToData; } } diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/copy/HiveParserQBParseInfo.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/copy/HiveParserQBParseInfo.java index 0c37710202b08..ba6778a151170 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/copy/HiveParserQBParseInfo.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/copy/HiveParserQBParseInfo.java @@ -158,8 +158,7 @@ public void addInsertIntoTable(String fullName, HiveParserASTNode ast) { } // See also {@link #getInsertOverwriteTables()} - public boolean isInsertIntoTable(String dbName, String table) { - String fullName = dbName + "." + table; + public boolean isInsertIntoTable(String fullName) { return insertIntoTables.containsKey(fullName.toLowerCase()); } diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/copy/HiveParserSemanticAnalyzer.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/copy/HiveParserSemanticAnalyzer.java index 5275493c86076..17b7209aa69ce 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/copy/HiveParserSemanticAnalyzer.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/copy/HiveParserSemanticAnalyzer.java @@ -18,8 +18,19 @@ package org.apache.flink.table.planner.delegation.hive.copy; -import org.apache.flink.table.catalog.hive.client.HiveShim; -import org.apache.flink.table.catalog.hive.util.HiveTableUtil; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.catalog.CatalogBaseTable; +import org.apache.flink.table.catalog.CatalogManager; +import org.apache.flink.table.catalog.CatalogPartition; +import org.apache.flink.table.catalog.CatalogPartitionSpec; +import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.catalog.CatalogTableImpl; +import org.apache.flink.table.catalog.CatalogView; +import org.apache.flink.table.catalog.ContextResolvedTable; +import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.table.catalog.UnresolvedIdentifier; import org.apache.flink.table.planner.delegation.hive.HiveParserTypeCheckProcFactory; import org.apache.flink.table.planner.delegation.hive.HiveParserUtils; import org.apache.flink.table.planner.delegation.hive.copy.HiveParserBaseSemanticAnalyzer.TableSpec; @@ -32,6 +43,7 @@ import org.apache.flink.table.planner.delegation.hive.parse.HiveASTParser; import org.apache.flink.table.planner.delegation.hive.parse.HiveParserDDLSemanticAnalyzer; import org.apache.flink.table.planner.delegation.hive.parse.HiveParserErrorMsg; +import org.apache.flink.table.types.DataType; import org.apache.flink.util.Preconditions; import org.antlr.runtime.ClassicToken; @@ -41,10 +53,8 @@ import org.apache.calcite.rel.RelNode; import org.apache.calcite.sql.SqlOperator; import org.apache.calcite.tools.FrameworkConfig; -import org.apache.commons.io.FileUtils; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.common.ObjectPair; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.Warehouse; @@ -56,17 +66,12 @@ import org.apache.hadoop.hive.ql.exec.FunctionRegistry; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.hooks.ReadEntity; -import org.apache.hadoop.hive.ql.io.HiveOutputFormat; -import org.apache.hadoop.hive.ql.io.RCFileInputFormat; -import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat; import org.apache.hadoop.hive.ql.lib.Dispatcher; import org.apache.hadoop.hive.ql.lib.GraphWalker; import org.apache.hadoop.hive.ql.lib.Node; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.HiveUtils; -import org.apache.hadoop.hive.ql.metadata.Partition; -import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.parse.ColumnAccessInfo; import org.apache.hadoop.hive.ql.parse.GlobalLimitCtx; import org.apache.hadoop.hive.ql.parse.JoinType; @@ -80,14 +85,12 @@ import org.apache.hadoop.hive.ql.plan.ExprNodeDescUtils; import org.apache.hadoop.hive.ql.plan.ExprNodeFieldDesc; import org.apache.hadoop.hive.ql.plan.HiveOperation; -import org.apache.hadoop.hive.ql.plan.PlanUtils; import org.apache.hadoop.hive.ql.session.SessionState; -import org.apache.hadoop.mapred.InputFormat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.File; -import java.nio.file.Files; +import javax.annotation.Nullable; + import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -97,6 +100,7 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Optional; import java.util.Set; import java.util.regex.Pattern; import java.util.regex.PatternSyntaxException; @@ -107,9 +111,12 @@ import static org.apache.flink.table.planner.delegation.hive.copy.HiveParserBaseSemanticAnalyzer.findTabRefIdxs; import static org.apache.flink.table.planner.delegation.hive.copy.HiveParserBaseSemanticAnalyzer.getAliasId; import static org.apache.flink.table.planner.delegation.hive.copy.HiveParserBaseSemanticAnalyzer.getColumnInternalName; +import static org.apache.flink.table.planner.delegation.hive.copy.HiveParserBaseSemanticAnalyzer.getObjectIdentifier; import static org.apache.flink.table.planner.delegation.hive.copy.HiveParserBaseSemanticAnalyzer.getUnescapedName; +import static org.apache.flink.table.planner.delegation.hive.copy.HiveParserBaseSemanticAnalyzer.getUnescapedOriginTableName; import static org.apache.flink.table.planner.delegation.hive.copy.HiveParserBaseSemanticAnalyzer.handleQueryWindowClauses; import static org.apache.flink.table.planner.delegation.hive.copy.HiveParserBaseSemanticAnalyzer.initPhase1Ctx; +import static org.apache.flink.table.planner.delegation.hive.copy.HiveParserBaseSemanticAnalyzer.parseCompoundName; import static org.apache.flink.table.planner.delegation.hive.copy.HiveParserBaseSemanticAnalyzer.processPTFPartitionSpec; import static org.apache.flink.table.planner.delegation.hive.copy.HiveParserBaseSemanticAnalyzer.processWindowFunction; import static org.apache.flink.table.planner.delegation.hive.copy.HiveParserBaseSemanticAnalyzer.readProps; @@ -117,7 +124,7 @@ import static org.apache.flink.table.planner.delegation.hive.copy.HiveParserBaseSemanticAnalyzer.unescapeIdentifier; import static org.apache.flink.table.planner.delegation.hive.copy.HiveParserBaseSemanticAnalyzer.unescapeSQLString; import static org.apache.flink.table.planner.delegation.hive.copy.HiveParserBaseSemanticAnalyzer.unparseExprForValuesClause; -import static org.apache.flink.table.planner.delegation.hive.copy.HiveParserBaseSemanticAnalyzer.validatePartSpec; +import static org.apache.flink.table.planner.delegation.hive.copy.HiveParserBaseSemanticAnalyzer.validatePartColumnType; /** * Counterpart of hive's org.apache.hadoop.hive.ql.parse.SemanticAnalyzer and adapted to our needs. @@ -152,13 +159,6 @@ public class HiveParserSemanticAnalyzer { private final String autogenColAliasPrfxLbl; private final boolean autogenColAliasPrfxIncludeFuncName; - // Keep track of view alias to read entity corresponding to the view - // For eg: for a query like 'select * from V3', where V3 -> V2, V2 -> V1, V1 -> T - // keeps track of aliases for V3, V3:V2, V3:V2:V1. - // This is used when T is added as an input for the query, the parents of T is - // derived from the alias V3:V2:V1:T - private final Map viewAliasToInput; - // need merge isDirect flag to input even if the newInput does not have a parent private boolean mergeIsDirect; @@ -179,9 +179,6 @@ public class HiveParserSemanticAnalyzer { protected HiveParserBaseSemanticAnalyzer.AnalyzeRewriteContext analyzeRewrite; - // A mapping from a tableName to a table object in metastore. - Map tabNameToTabObject; - public ColumnAccessInfo columnAccessInfo; private final HiveConf conf; @@ -189,10 +186,7 @@ public class HiveParserSemanticAnalyzer { public HiveParserContext ctx; QueryProperties queryProperties; - - private final HiveShim hiveShim; - - private final Hive db; + private Hive db; // ReadEntities that are passed to the hooks. protected HashSet inputs = new LinkedHashSet<>(); @@ -202,23 +196,19 @@ public class HiveParserSemanticAnalyzer { private final FrameworkConfig frameworkConfig; private final RelOptCluster cluster; + private final CatalogManager catalogManager; + public HiveParserSemanticAnalyzer( HiveParserQueryState queryState, - HiveShim hiveShim, FrameworkConfig frameworkConfig, - RelOptCluster cluster) + RelOptCluster cluster, + CatalogManager catalogManager) throws SemanticException { this.queryState = queryState; this.conf = queryState.getConf(); - this.hiveShim = hiveShim; - try { - this.db = Hive.get(conf); - } catch (HiveException e) { - throw new SemanticException(e); - } + this.catalogManager = catalogManager; nameToSplitSample = new HashMap<>(); prunedPartitions = new HashMap<>(); - tabNameToTabObject = new HashMap<>(); unparseTranslator = new HiveParserUnparseTranslator(conf); autogenColAliasPrfxLbl = HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_AUTOGEN_COLUMNALIAS_PREFIX_LABEL); @@ -228,10 +218,8 @@ public HiveParserSemanticAnalyzer( queryProperties = new QueryProperties(); aliasToCTEs = new HashMap<>(); globalLimitCtx = new GlobalLimitCtx(); - viewAliasToInput = new HashMap<>(); mergeIsDirect = true; noscan = partialscan = false; - tabNameToTabObject = new HashMap<>(); defaultJoinMerge = !Boolean.parseBoolean(conf.get("hive.merge.nway.joins", "true")); disableJoinMerge = defaultJoinMerge; this.frameworkConfig = frameworkConfig; @@ -262,7 +250,6 @@ private void reset(boolean clearPartsCache) { } else { mergeIsDirect = false; } - tabNameToTabObject.clear(); qb = null; ast = null; disableJoinMerge = defaultJoinMerge; @@ -274,7 +261,6 @@ private void reset(boolean clearPartsCache) { viewSelect = null; ctesExpanded = null; globalLimitCtx.disableOpt(); - viewAliasToInput.clear(); unparseTranslator.clear(); queryProperties.clear(); } @@ -486,7 +472,13 @@ private String processTable(HiveParserQB qb, HiveParserASTNode tabref) HiveParserASTNode tableTree = (HiveParserASTNode) (tabref.getChild(0)); - String tabIdName = getUnescapedName(tableTree).toLowerCase(); + String qualifiedTableName = + getUnescapedName( + tableTree, + catalogManager.getCurrentCatalog(), + catalogManager.getCurrentDatabase()) + .toLowerCase(); + String originTableName = getUnescapedOriginTableName(tableTree); String alias = findSimpleTableName(tabref, aliasIndex); @@ -569,7 +561,7 @@ private String processTable(HiveParserQB qb, HiveParserASTNode tabref) nameToSplitSample.put(aliasId, sample); } // Insert this map into the stats - qb.setTabAlias(alias, tabIdName); + qb.setTabAlias(alias, originTableName, qualifiedTableName); if (qb.isInsideView()) { qb.getAliasInsideView().add(alias.toLowerCase()); } @@ -579,9 +571,11 @@ private String processTable(HiveParserQB qb, HiveParserASTNode tabref) // if alias to CTE contains the table name, we do not do the translation because // cte is actually a subquery. - if (!this.aliasToCTEs.containsKey(tabIdName)) { + if (!this.aliasToCTEs.containsKey(qualifiedTableName)) { unparseTranslator.addTableNameTranslation( - tableTree, SessionState.get().getCurrentDatabase()); + tableTree, + catalogManager.getCurrentCatalog(), + catalogManager.getCurrentDatabase()); if (aliasIndex != 0) { unparseTranslator.addIdentifierTranslation( (HiveParserASTNode) tabref.getChild(aliasIndex)); @@ -636,9 +630,9 @@ private HiveParserASTNode genValuesTempTable(HiveParserASTNode originalFrom, Hiv List rows = valuesTable.getChildren(); List> valuesData = new ArrayList<>(rows.size()); + List fieldsName = new ArrayList<>(); + List fieldsDataType = new ArrayList<>(); try { - List fields = new ArrayList<>(); - boolean firstRow = true; for (Node n : rows) { // Each of the children of TOK_VALUES_TABLE will be a TOK_VALUE_ROW @@ -654,7 +648,8 @@ private HiveParserASTNode genValuesTempTable(HiveParserASTNode originalFrom, Hiv for (Node n1 : columns) { HiveParserASTNode column = (HiveParserASTNode) n1; if (firstRow) { - fields.add(new FieldSchema("tmp_values_col" + nextColNum++, "string", "")); + fieldsName.add("tmp_values_col" + nextColNum++); + fieldsDataType.add(DataTypes.STRING()); } data.add(unparseExprForValuesClause(column)); } @@ -662,23 +657,18 @@ private HiveParserASTNode genValuesTempTable(HiveParserASTNode originalFrom, Hiv valuesData.add(data); } - // Step 2, create a temp table - Table table = db.newTable(tableName); - table.setSerializationLib(conf.getVar(HiveConf.ConfVars.HIVEDEFAULTSERDE)); - HiveTableUtil.setStorageFormat(table.getSd(), "TextFile", conf); - table.setFields(fields); - // make up a path for this table - File dataLocation = Files.createTempDirectory(tableName).toFile(); - try { - table.setDataLocation(new Path(dataLocation.toURI().toString(), tableName)); - table.getTTable().setTemporary(true); - table.setStoredAsSubDirectories(false); - db.createTable(table, false); - } finally { - FileUtils.deleteQuietly(dataLocation); - } + // Step 2, create a temp table to maintain table schema + CatalogTable tempTable = + new CatalogTableImpl( + TableSchema.builder() + .fields( + fieldsName.toArray(new String[0]), + fieldsDataType.toArray(new DataType[0])) + .build(), + Collections.emptyMap(), + "values temp table"); // remember the data for this table - qb.getValuesTableToData().put(tableName, valuesData); + qb.getValuesTableToData().put(tableName, Tuple2.of(tempTable, valuesData)); } catch (Exception e) { throw new SemanticException("Failed to create temp table for VALUES", e); } @@ -958,11 +948,11 @@ public boolean doPhase1( break; case HiveASTParser.TOK_INSERT_INTO: - String currentDatabase = SessionState.get().getCurrentDatabase(); String tabName = getUnescapedName( (HiveParserASTNode) ast.getChild(0).getChild(0), - currentDatabase); + catalogManager.getCurrentCatalog(), + catalogManager.getCurrentDatabase()); qbp.addInsertIntoTable(tabName, ast); // TODO: hive doesn't break here, so we copy what's below here handleTokDestination(ctx1, ast, qbp, plannerCtx); @@ -1119,8 +1109,11 @@ public boolean doPhase1( String tableName = getUnescapedName((HiveParserASTNode) ast.getChild(0).getChild(0)) .toLowerCase(); + String originTableName = + getUnescapedOriginTableName( + (HiveParserASTNode) ast.getChild(0).getChild(0)); - qb.setTabAlias(tableName, tableName); + qb.setTabAlias(tableName, originTableName, tableName); qb.addAlias(tableName); qb.getParseInfo().setIsAnalyzeCommand(true); qb.getParseInfo().setNoScanAnalyzeCommand(this.noscan); @@ -1150,8 +1143,9 @@ public boolean doPhase1( if (destination.getChildCount() == 2 && tab.getChildCount() == 2 && destination.getChild(1).getType() == HiveASTParser.TOK_IFNOTEXISTS) { - String name = - getUnescapedName((HiveParserASTNode) tab.getChild(0)).toLowerCase(); + ObjectIdentifier tableIdentifier = + getObjectIdentifier( + catalogManager, (HiveParserASTNode) tab.getChild(0)); Tree partitions = tab.getChild(1); int numChildren = partitions.getChildCount(); @@ -1171,33 +1165,26 @@ public boolean doPhase1( ErrorMsg.INSERT_INTO_DYNAMICPARTITION_IFNOTEXISTS.getMsg( partition.toString())); } - Table table; - try { - table = getTableObjectByName(name); - } catch (HiveException ex) { - throw new SemanticException(ex); - } - try { - Partition parMetaData = db.getPartition(table, partition, false); - // Check partition exists if it exists skip the overwrite - if (parMetaData != null) { - phase1Result = false; - skipRecursion = true; - LOG.info( - "Partition already exists so insert into overwrite " - + "skipped for partition : " - + parMetaData.toString()); - break; - } - } catch (HiveException e) { - LOG.info("Error while getting metadata : ", e); + Optional catalogPartition = + catalogManager.getPartition( + tableIdentifier, new CatalogPartitionSpec(partition)); + // Check partition exists if it exists skip the overwrite + if (catalogPartition.isPresent()) { + phase1Result = false; + skipRecursion = true; + LOG.info( + "Partition already exists so insert into overwrite " + + "skipped for partition : " + + partition); + break; } - validatePartSpec( - table, + CatalogTable catalogTable = + getCatalogTable(tableIdentifier.asSummaryString(), qb); + validatePartColumnType( + catalogTable, partition, (HiveParserASTNode) tab, conf, - false, frameworkConfig, cluster); } @@ -1251,7 +1238,8 @@ private void handleTokDestination( String fullTableName = getUnescapedName( (HiveParserASTNode) ast.getChild(0).getChild(0), - SessionState.get().getCurrentDatabase()); + catalogManager.getCurrentCatalog(), + catalogManager.getCurrentDatabase()); qbp.getInsertOverwriteTables().put(fullTableName, ast); } } @@ -1315,7 +1303,8 @@ private void handleInsertStatementSpecPhase1( String fullTableName = getUnescapedName( (HiveParserASTNode) ast.getChild(0).getChild(0), - SessionState.get().getCurrentDatabase()); + catalogManager.getCurrentCatalog(), + catalogManager.getCurrentDatabase()); qbp.setDestSchemaForClause(ctx1.dest, targetColNames); Set targetColumns = new HashSet<>(targetColNames); if (targetColNames.size() != targetColumns.size()) { @@ -1326,21 +1315,16 @@ private void handleInsertStatementSpecPhase1( + fullTableName + " table schema specification")); } - Table targetTable; - try { - targetTable = db.getTable(fullTableName, false); - } catch (HiveException ex) { - LOG.error("Error processing HiveASTParser.TOK_DESTINATION: " + ex.getMessage(), ex); - throw new SemanticException(ex); - } - if (targetTable == null) { - throw new SemanticException( - HiveParserUtils.generateErrorMessage( - ast, "Unable to access metadata for table " + fullTableName)); - } - for (FieldSchema f : targetTable.getCols()) { + CatalogTable targetTable = getCatalogTable(fullTableName, qb); + Set partitionColumns = new HashSet<>(targetTable.getPartitionKeys()); + TableSchema tableSchema = + HiveParserUtils.fromUnresolvedSchema(targetTable.getUnresolvedSchema()); + for (String column : tableSchema.getFieldNames()) { // parser only allows foo(a,b), not foo(foo.a, foo.b) - targetColumns.remove(f.getName()); + // only consider non-partition col + if (!partitionColumns.contains(column)) { + targetColumns.remove(column); + } } // here we need to see if remaining columns are dynamic partition columns if (!targetColumns.isEmpty()) { @@ -1441,8 +1425,8 @@ private void gatherCTEReferences( HiveParserQB qb, HiveParserBaseSemanticAnalyzer.CTEClause current) throws HiveException { for (String alias : qb.getTabAliases()) { - String tabName = qb.getTabNameForAlias(alias); - String cteName = tabName.toLowerCase(); + String originTabName = qb.getOriginTabNameForAlias(alias); + String cteName = originTabName.toLowerCase(); HiveParserBaseSemanticAnalyzer.CTEClause cte = findCTEFromName(qb, cteName); if (cte != null) { @@ -1474,17 +1458,13 @@ private void gatherCTEReferences( } } - public void getMetaData(HiveParserQB qb) throws SemanticException { - getMetaData(qb, false); - } - public void getMetaData(HiveParserQB qb, boolean enableMaterialization) throws SemanticException { try { if (enableMaterialization) { getMaterializationMetadata(qb); } - getMetaData(qb, null); + getMetaData(qb); } catch (HiveException e) { LOG.error(org.apache.hadoop.util.StringUtils.stringifyException(e)); if (e instanceof SemanticException) { @@ -1494,17 +1474,17 @@ public void getMetaData(HiveParserQB qb, boolean enableMaterialization) } } - private void getMetaData(HiveParserQBExpr qbexpr, ReadEntity parentInput) throws HiveException { + private void getMetaData(HiveParserQBExpr qbexpr) throws HiveException { if (qbexpr.getOpcode() == HiveParserQBExpr.Opcode.NULLOP) { - getMetaData(qbexpr.getQB(), parentInput); + getMetaData(qbexpr.getQB()); } else { - getMetaData(qbexpr.getQBExpr1(), parentInput); - getMetaData(qbexpr.getQBExpr2(), parentInput); + getMetaData(qbexpr.getQBExpr1()); + getMetaData(qbexpr.getQBExpr2()); } } @SuppressWarnings("nls") - private void getMetaData(HiveParserQB qb, ReadEntity parentInput) throws HiveException { + private void getMetaData(HiveParserQB qb) throws HiveException { LOG.info("Get metadata for source tables"); // Go over the tables and populate the related structures. We have to materialize the table @@ -1512,22 +1492,29 @@ private void getMetaData(HiveParserQB qb, ReadEntity parentInput) throws HiveExc // modify it in the middle for view rewrite. List tabAliases = new ArrayList<>(qb.getTabAliases()); - // Keep track of view alias to view name and read entity + // Keep track of view alias to view name // For eg: for a query like 'select * from V3', where V3 -> V2, V2 -> V1, V1 -> T - // keeps track of full view name and read entity corresponding to alias V3, V3:V2, V3:V2:V1. - // This is needed for tracking the dependencies for inputs, along with their parents. - Map> aliasToViewInfo = new HashMap<>(); + // keeps track of full view name corresponding to alias V3, V3:V2, V3:V2:V1. + Map aliasToViewInfo = new HashMap<>(); // used to capture view to SQ conversions. This is used to check for recursive CTE // invocations. Map sqAliasToCTEName = new HashMap<>(); for (String alias : tabAliases) { + // tabName will always be "catalog.db.table" String tabName = qb.getTabNameForAlias(alias); - String cteName = tabName.toLowerCase(); - - Table tab = db.getTable(tabName, false); - if (tab == null || tab.getDbName().equals(SessionState.get().getCurrentDatabase())) { + ObjectIdentifier tableIdentifier = parseCompoundName(catalogManager, tabName); + // get the origin table name like "table", "db.table", "catalog.db.table" that user + // specifies + String originTabName = qb.getOriginTabNameForAlias(alias); + String cteName = originTabName.toLowerCase(); + + CatalogBaseTable tab = getCatalogBaseTable(tabName, qb, false); + if (tab == null + || tableIdentifier + .getDatabaseName() + .equals(catalogManager.getCurrentDatabase())) { // we first look for this alias from CTE, and then from catalog. HiveParserBaseSemanticAnalyzer.CTEClause cte = findCTEFromName(qb, cteName); if (cte != null) { @@ -1549,106 +1536,27 @@ private void getMetaData(HiveParserQB qb, ReadEntity parentInput) throws HiveExc throw new SemanticException(ErrorMsg.INVALID_TABLE.getMsg(alias)); } } - if (tab.isView()) { + if (tab instanceof CatalogView) { if (qb.getParseInfo().isAnalyzeCommand()) { throw new SemanticException(ErrorMsg.ANALYZE_VIEW.getMsg()); } - String fullViewName = tab.getDbName() + "." + tab.getTableName(); // Prevent view cycles - if (viewsExpanded.contains(fullViewName)) { + if (viewsExpanded.contains(tabName)) { throw new SemanticException( "Recursive view " - + fullViewName + + tabName + " detected (cycle: " + StringUtils.join(viewsExpanded, " -> ") + " -> " - + fullViewName + + tabName + ")."); } - replaceViewReferenceWithDefinition(qb, tab, tabName, alias); - // This is the last time we'll see the Table objects for views, so add it to the - // inputs now. isInsideView will tell if this view is embedded in another view. - // If the view is Inside another view, it should have at least one parent - if (qb.isInsideView() && parentInput == null) { - parentInput = - PlanUtils.getParentViewInfo(getAliasId(alias, qb), viewAliasToInput); - } - ReadEntity viewInput = new ReadEntity(tab, parentInput, !qb.isInsideView()); - viewInput = PlanUtils.addInput(inputs, viewInput); - aliasToViewInfo.put(alias, new ObjectPair<>(fullViewName, viewInput)); - String aliasId = getAliasId(alias, qb); - if (aliasId != null) { - aliasId = aliasId.replace(SUBQUERY_TAG_1, "").replace(SUBQUERY_TAG_2, ""); - } - viewAliasToInput.put(aliasId, viewInput); + replaceViewReferenceWithDefinition(qb, (CatalogView) tab, tabName, alias); + aliasToViewInfo.put(alias, tabName); continue; } - if (!InputFormat.class.isAssignableFrom(tab.getInputFormatClass())) { - throw new SemanticException( - HiveParserUtils.generateErrorMessage( - qb.getParseInfo().getSrcForAlias(alias), - ErrorMsg.INVALID_INPUT_FORMAT_TYPE.getMsg())); - } - - qb.getMetaData().setSrcForAlias(alias, tab); - - if (qb.getParseInfo().isAnalyzeCommand()) { - // allow partial partition specification for nonscan since noscan is fast. - TableSpec ts = - new TableSpec( - db, - conf, - (HiveParserASTNode) ast.getChild(0), - true, - this.noscan, - frameworkConfig, - cluster); - if (ts.specType == SpecType.DYNAMIC_PARTITION) { // dynamic partitions - try { - ts.partitions = db.getPartitionsByNames(ts.tableHandle, ts.partSpec); - } catch (HiveException e) { - throw new SemanticException( - HiveParserUtils.generateErrorMessage( - qb.getParseInfo().getSrcForAlias(alias), - "Cannot get partitions for " + ts.partSpec), - e); - } - } - // validate partial scan command - HiveParserQBParseInfo qbpi = qb.getParseInfo(); - if (qbpi.isPartialScanAnalyzeCommand()) { - Class inputFormatClass = null; - switch (ts.specType) { - case TABLE_ONLY: - case DYNAMIC_PARTITION: - inputFormatClass = ts.tableHandle.getInputFormatClass(); - break; - case STATIC_PARTITION: - inputFormatClass = ts.partHandle.getInputFormatClass(); - break; - default: - assert false; - } - if (!(inputFormatClass.equals(RCFileInputFormat.class) - || inputFormatClass.equals(OrcInputFormat.class))) { - throw new SemanticException( - "ANALYZE TABLE PARTIALSCAN doesn't support non-RCfile."); - } - } - - qb.getParseInfo().addTableSpec(alias, ts); - } - - ReadEntity parentViewInfo = - PlanUtils.getParentViewInfo(getAliasId(alias, qb), viewAliasToInput); - // Temporary tables created during the execution are not the input sources - if (!HiveParserUtils.isValuesTempTable(alias)) { - HiveParserUtils.addInput( - inputs, - new ReadEntity(tab, parentViewInfo, parentViewInfo == null), - mergeIsDirect); - } + qb.getMetaData().setSrcForAlias(alias, tabName, (CatalogTable) tab); } LOG.info("Get metadata for subqueries"); @@ -1656,15 +1564,13 @@ private void getMetaData(HiveParserQB qb, ReadEntity parentInput) throws HiveExc for (String alias : qb.getSubqAliases()) { boolean wasView = aliasToViewInfo.containsKey(alias); boolean wasCTE = sqAliasToCTEName.containsKey(alias); - ReadEntity newParentInput = null; if (wasView) { - viewsExpanded.add(aliasToViewInfo.get(alias).getFirst()); - newParentInput = aliasToViewInfo.get(alias).getSecond(); + viewsExpanded.add(aliasToViewInfo.get(alias)); } else if (wasCTE) { ctesExpanded.add(sqAliasToCTEName.get(alias)); } HiveParserQBExpr qbexpr = qb.getSubqForAlias(alias); - getMetaData(qbexpr, newParentInput); + getMetaData(qbexpr); if (wasView) { viewsExpanded.remove(viewsExpanded.size() - 1); } else if (wasCTE) { @@ -1685,38 +1591,28 @@ private void getMetaData(HiveParserQB qb, ReadEntity parentInput) throws HiveExc switch (ast.getToken().getType()) { case HiveASTParser.TOK_TAB: { - TableSpec ts = new TableSpec(db, conf, ast, frameworkConfig, cluster); - if (ts.tableHandle.isView() - || hiveShim.isMaterializedView(ts.tableHandle)) { + TableSpec ts = + new TableSpec(catalogManager, conf, ast, frameworkConfig, cluster); + if (ts.table instanceof CatalogView) { throw new SemanticException(ErrorMsg.DML_AGAINST_VIEW.getMsg()); } - Class outputFormatClass = ts.tableHandle.getOutputFormatClass(); - if (!ts.tableHandle.isNonNative() - && !HiveOutputFormat.class.isAssignableFrom(outputFormatClass)) { - throw new SemanticException( - HiveParserErrorMsg.getMsg( - ErrorMsg.INVALID_OUTPUT_FORMAT_TYPE, - ast, - "The class is " + outputFormatClass.toString())); - } - boolean isTableWrittenTo = qb.getParseInfo() - .isInsertIntoTable( - ts.tableHandle.getDbName(), - ts.tableHandle.getTableName()); + .isInsertIntoTable(ts.tableIdentifier.asSummaryString()); isTableWrittenTo |= (qb.getParseInfo() .getInsertOverwriteTables() .get( getUnescapedName( (HiveParserASTNode) ast.getChild(0), - ts.tableHandle.getDbName())) + ts.tableIdentifier.getCatalogName(), + ts.tableIdentifier + .getDatabaseName())) != null); assert isTableWrittenTo : "Inconsistent data structure detected: we are writing to " - + ts.tableHandle + + ts.tableIdentifier.asSummaryString() + " in " + name + " but it's not in isInsertIntoTable() or getInsertOverwriteTables()"; @@ -1725,18 +1621,30 @@ private void getMetaData(HiveParserQB qb, ReadEntity parentInput) throws HiveExc // but whether the table itself is partitioned is not know. if (ts.specType != SpecType.STATIC_PARTITION) { // This is a table or dynamic partition - qb.getMetaData().setDestForAlias(name, ts.tableHandle); + qb.getMetaData() + .setDestForAlias( + name, + ts.tableIdentifier.asSummaryString(), + (CatalogTable) ts.table); // has dynamic as well as static partitions if (ts.partSpec != null && ts.partSpec.size() > 0) { qb.getMetaData().setPartSpecForAlias(name, ts.partSpec); } } else { + // rewrite QBMetaData // This is a partition - qb.getMetaData().setDestForAlias(name, ts.partHandle); + qb.getMetaData() + .setDestForAlias( + name, + ts.tableIdentifier.asSummaryString(), + (CatalogTable) ts.table, + ts.partHandle); } if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVESTATSAUTOGATHER)) { // Add the table spec for the destination table. - qb.getParseInfo().addTableSpec(ts.tableName.toLowerCase(), ts); + qb.getParseInfo() + .addTableSpec( + ts.tableIdentifier.asSummaryString().toLowerCase(), ts); } break; } @@ -1781,9 +1689,18 @@ private void getMetaData(HiveParserQB qb, ReadEntity parentInput) throws HiveExc conf, HiveConf.ConfVars.HIVESTATSAUTOGATHER)) { TableSpec ts = new TableSpec( - db, conf, this.ast, frameworkConfig, cluster); + catalogManager, + conf, + this.ast, + frameworkConfig, + cluster); // Add the table spec for the destination table. - qb.getParseInfo().addTableSpec(ts.tableName.toLowerCase(), ts); + qb.getParseInfo() + .addTableSpec( + ts.tableIdentifier + .asSummaryString() + .toLowerCase(), + ts); } } else { // This is the only place where isQuery is set to true; it defaults @@ -1840,20 +1757,21 @@ private void getMetaData(HiveParserQB qb, ReadEntity parentInput) throws HiveExc } private void replaceViewReferenceWithDefinition( - HiveParserQB qb, Table tab, String tabName, String alias) throws SemanticException { + HiveParserQB qb, CatalogView catalogView, String viewName, String alias) + throws SemanticException { HiveParserASTNode viewTree; final HiveParserASTNodeOrigin viewOrigin = new HiveParserASTNodeOrigin( "VIEW", - tab.getTableName(), - tab.getViewExpandedText(), + viewName, + catalogView.getExpandedQuery(), alias, qb.getParseInfo().getSrcForAlias(alias)); try { // Reparse text, passing null for context to avoid clobbering // the top-level token stream. - String viewText = tab.getViewExpandedText(); - viewTree = HiveASTParseUtils.parse(viewText, ctx, tab.getCompleteName()); + String viewText = catalogView.getExpandedQuery(); + viewTree = HiveASTParseUtils.parse(viewText, ctx, viewName); Dispatcher nodeOriginDispatcher = (nd, stack, nodeOutputs) -> { @@ -1880,9 +1798,9 @@ private void replaceViewReferenceWithDefinition( && !qb.isInsideView() && HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_AUTHORIZATION_ENABLED)) || HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_STATS_COLLECT_SCANCOLS)) { - qb.rewriteViewToSubq(alias, tabName, qbexpr, tab); + qb.rewriteViewToSubq(alias, viewName, qbexpr, catalogView); } else { - qb.rewriteViewToSubq(alias, tabName, qbexpr, null); + qb.rewriteViewToSubq(alias, viewName, qbexpr, null); } } @@ -2158,6 +2076,45 @@ public String getAutogenColAliasPrfxLbl() { return this.autogenColAliasPrfxLbl; } + public CatalogBaseTable getCatalogBaseTable(String tableName, HiveParserQB qb) { + return getCatalogBaseTable(tableName, qb, true); + } + + @Nullable + public CatalogBaseTable getCatalogBaseTable( + String tableName, HiveParserQB qb, boolean throwException) { + // first try to get the table from QB, temp table will be stored in here. + // the tableName passed is resolved as 'catalog.db.table', but the temp table is stored as + // unresolved which only contains table name, so we need to get the actual table name from + // the passed 'tableName' + String tempTableName = parseCompoundName(catalogManager, tableName).getObjectName(); + if (qb.getValuesTableToData().containsKey(tempTableName)) { + return qb.getValuesTableToData().get(tempTableName).f0; + } + // then get the table from catalogs + ObjectIdentifier tableIdentifier = + catalogManager.qualifyIdentifier(UnresolvedIdentifier.of(tableName.split("\\."))); + Optional optionalTab = catalogManager.getTable(tableIdentifier); + if (!optionalTab.isPresent()) { + if (throwException) { + throw new IllegalArgumentException( + String.format("Table %s doesn't exist.", tableName)); + } else { + return null; + } + } else { + return optionalTab.get().getTable(); + } + } + + public CatalogTable getCatalogTable(String tableName, HiveParserQB qb) { + CatalogBaseTable catalogBaseTable = getCatalogBaseTable(tableName, qb); + if (!(catalogBaseTable instanceof CatalogTable)) { + throw new IllegalArgumentException(tableName + " isn't a table."); + } + return (CatalogTable) catalogBaseTable; + } + public boolean autogenColAliasPrfxIncludeFuncName() { return this.autogenColAliasPrfxIncludeFuncName; } @@ -2226,16 +2183,6 @@ public void init(boolean clearPartsCache) { this.qb = new HiveParserQB(null, null, false); } - private Table getTableObjectByName(String tableName) throws HiveException { - if (!tabNameToTabObject.containsKey(tableName)) { - Table table = db.getTable(tableName); - tabNameToTabObject.put(tableName, table); - return table; - } else { - return tabNameToTabObject.get(tableName); - } - } - public boolean genResolvedParseTree(HiveParserASTNode ast, HiveParserPlannerContext plannerCtx) throws SemanticException { this.ast = ast; diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/copy/HiveParserUnparseTranslator.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/copy/HiveParserUnparseTranslator.java index 5b6300d6bd1ea..e023312e4a92d 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/copy/HiveParserUnparseTranslator.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/copy/HiveParserUnparseTranslator.java @@ -117,7 +117,8 @@ public void addTranslation(HiveParserASTNode node, String replacementText) { translations.put(tokenStartIndex, translation); } - public void addTableNameTranslation(HiveParserASTNode tableName, String currentDatabaseName) { + public void addTableNameTranslation( + HiveParserASTNode tableName, String currentCatalog, String currentDatabaseName) { if (!enabled) { return; } @@ -126,16 +127,21 @@ public void addTableNameTranslation(HiveParserASTNode tableName, String currentD return; } assert (tableName.getToken().getType() == HiveASTParser.TOK_TABNAME); - assert (tableName.getChildCount() <= 2); + assert (tableName.getChildCount() <= 3); - if (tableName.getChildCount() == 2) { - addIdentifierTranslation((HiveParserASTNode) tableName.getChild(0)); - addIdentifierTranslation((HiveParserASTNode) tableName.getChild(1)); + if (tableName.getChildCount() > 1) { + for (int i = 0; i < tableName.getChildCount(); i++) { + // add identifier translation for catalog, database, table + addIdentifierTranslation((HiveParserASTNode) tableName.getChild(i)); + } } else { // transform the table reference to an absolute reference (i.e., "db.table") StringBuilder replacementText = new StringBuilder(); - replacementText.append(HiveUtils.unparseIdentifier(currentDatabaseName, conf)); - replacementText.append('.'); + replacementText.append( + String.format( + "%s.%s.", + HiveUtils.unparseIdentifier(currentCatalog, conf), + HiveUtils.unparseIdentifier(currentDatabaseName, conf))); HiveParserASTNode identifier = (HiveParserASTNode) tableName.getChild(0); String identifierText = diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/FromClauseASTParser.g b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/FromClauseASTParser.g index 82ea870f6f009..77ed836453372 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/FromClauseASTParser.g +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/FromClauseASTParser.g @@ -216,6 +216,9 @@ tableName @init { gParent.pushMsg("table name", state); } @after { gParent.popMsg(state); } : + (identifier DOT identifier DOT identifier) => cat=identifier DOT db=identifier DOT tab=identifier + -> ^(TOK_TABNAME $cat $db $tab) + | db=identifier DOT tab=identifier -> ^(TOK_TABNAME $db $tab) | @@ -227,8 +230,14 @@ viewName @init { gParent.pushMsg("view name", state); } @after { gParent.popMsg(state); } : - (db=identifier DOT)? view=identifier - -> ^(TOK_TABNAME $db? $view) + (identifier DOT identifier DOT identifier) => cat=identifier DOT db=identifier DOT view=identifier + -> ^(TOK_TABNAME $cat $db $view) + | + db=identifier DOT view=identifier + -> ^(TOK_TABNAME $db $view) + | + view=identifier + -> ^(TOK_TABNAME $view) ; subQuerySource diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserDDLSemanticAnalyzer.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserDDLSemanticAnalyzer.java index 799cd09ad6dc2..b24d6b97f131e 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserDDLSemanticAnalyzer.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserDDLSemanticAnalyzer.java @@ -427,9 +427,11 @@ public Operation convertToOperation(HiveParserASTNode ast) throws SemanticExcept private Operation convertAlterTable(HiveParserASTNode input) throws SemanticException { Operation operation = null; HiveParserASTNode ast = (HiveParserASTNode) input.getChild(1); + ObjectIdentifier tableIdentifier = + HiveParserBaseSemanticAnalyzer.getObjectIdentifier( + catalogManager, (HiveParserASTNode) input.getChild(0)); String[] qualified = - HiveParserBaseSemanticAnalyzer.getQualifiedTableName( - (HiveParserASTNode) input.getChild(0)); + new String[] {tableIdentifier.getDatabaseName(), tableIdentifier.getObjectName()}; String tableName = HiveParserBaseSemanticAnalyzer.getDotName(qualified); HashMap partSpec = null; HiveParserASTNode partSpecNode = (HiveParserASTNode) input.getChild(2); @@ -680,7 +682,8 @@ private ExprNodeDesc getBody( HiveParserASTNode ast, List arguments, HiveParserRowResolver rowResolver) throws SemanticException { HiveParserSemanticAnalyzer semanticAnalyzer = - new HiveParserSemanticAnalyzer(queryState, hiveShim, frameworkConfig, cluster); + new HiveParserSemanticAnalyzer( + queryState, frameworkConfig, cluster, catalogManager); return arguments.isEmpty() ? semanticAnalyzer.genExprNodeDesc((HiveParserASTNode) ast.getChild(1), rowResolver) : semanticAnalyzer.genExprNodeDesc( @@ -702,9 +705,11 @@ private Operation convertDropMacro(HiveParserASTNode ast) throws SemanticExcepti } private Operation convertShowCreateTable(HiveParserASTNode ast) throws SemanticException { + ObjectIdentifier identifier = + HiveParserBaseSemanticAnalyzer.getObjectIdentifier( + catalogManager, (HiveParserASTNode) ast.getChild(0)); String[] qualTabName = - HiveParserBaseSemanticAnalyzer.getQualifiedTableName( - (HiveParserASTNode) ast.getChild(0)); + new String[] {identifier.getDatabaseName(), identifier.getObjectName()}; ObjectPath tablePath = new ObjectPath(qualTabName[0], qualTabName[1]); if (!isHive310OrLater()) { // before hive3, Hive will check the table type is index table or not @@ -725,9 +730,11 @@ private boolean isHive310OrLater() { private Operation convertAlterView(HiveParserASTNode ast) throws SemanticException { Operation operation = null; + ObjectIdentifier tableIdentifier = + HiveParserBaseSemanticAnalyzer.getObjectIdentifier( + catalogManager, (HiveParserASTNode) ast.getChild(0)); String[] qualified = - HiveParserBaseSemanticAnalyzer.getQualifiedTableName( - (HiveParserASTNode) ast.getChild(0)); + new String[] {tableIdentifier.getDatabaseName(), tableIdentifier.getObjectName()}; String tableName = HiveParserBaseSemanticAnalyzer.getDotName(qualified); CatalogBaseTable alteredTable = getAlteredTable(tableName, true); if (ast.getChild(1).getType() == HiveASTParser.TOK_QUERY) { @@ -759,9 +766,11 @@ private Operation convertAlterView(HiveParserASTNode ast) throws SemanticExcepti } private Operation convertCreateView(HiveParserASTNode ast) throws SemanticException { + ObjectIdentifier tableIdentifier = + HiveParserBaseSemanticAnalyzer.getObjectIdentifier( + catalogManager, (HiveParserASTNode) ast.getChild(0)); String[] qualTabName = - HiveParserBaseSemanticAnalyzer.getQualifiedTableName( - (HiveParserASTNode) ast.getChild(0)); + new String[] {tableIdentifier.getDatabaseName(), tableIdentifier.getObjectName()}; String dbDotTable = HiveParserBaseSemanticAnalyzer.getDotName(qualTabName); List cols = null; boolean ifNotExists = false; @@ -834,7 +843,7 @@ private Operation convertCreateView(HiveParserASTNode ast) throws SemanticExcept HiveParserCreateViewInfo createViewInfo = new HiveParserCreateViewInfo(dbDotTable, cols, selectStmt); - hiveParser.analyzeCreateView(createViewInfo, context, queryState, hiveShim); + hiveParser.analyzeCreateView(createViewInfo, context, queryState); ObjectIdentifier viewIdentifier = parseObjectIdentifier(createViewInfo.getCompoundName()); TableSchema schema = @@ -868,9 +877,11 @@ private Operation convertCreateView(HiveParserASTNode ast) throws SemanticExcept } private Operation convertCreateTable(HiveParserASTNode ast) throws SemanticException { + ObjectIdentifier tableIdentifier = + HiveParserBaseSemanticAnalyzer.getObjectIdentifier( + catalogManager, (HiveParserASTNode) ast.getChild(0)); String[] qualifiedTabName = - HiveParserBaseSemanticAnalyzer.getQualifiedTableName( - (HiveParserASTNode) ast.getChild(0)); + new String[] {tableIdentifier.getDatabaseName(), tableIdentifier.getObjectName()}; String dbDotTab = HiveParserBaseSemanticAnalyzer.getDotName(qualifiedTabName); String likeTableName; @@ -1048,19 +1059,25 @@ private Operation convertCreateTable(HiveParserASTNode ast) throws SemanticExcep // analyze the query HiveParserCalcitePlanner calcitePlanner = - hiveParser.createCalcitePlanner(context, queryState, hiveShim); + hiveParser.createCalcitePlanner(context, queryState); calcitePlanner.setCtasCols(cols); RelNode queryRelNode = calcitePlanner.genLogicalPlan(selectStmt); - // create a table to represent the dest table - String[] dbTblName = dbDotTab.split("\\."); - Table destTable = new Table(Table.getEmptyTable(dbTblName[0], dbTblName[1])); - destTable.getSd().setCols(cols); + TableSchema tableSchema = + HiveTableUtil.createTableSchema( + cols, partCols, Collections.emptySet(), null); + CatalogTable destTable = + new CatalogTableImpl( + tableSchema, + HiveCatalog.getFieldNames(partCols), + tblProps, + comment); Tuple4, Boolean> insertOperationInfo = dmlHelper.createInsertOperationInfo( queryRelNode, destTable, + tableIdentifier, Collections.emptyMap(), Collections.emptyList(), false); @@ -1253,7 +1270,7 @@ private Operation convertAlterDatabaseProperties(HiveParserASTNode ast) { return new AlterDatabaseOperation(catalogManager.getCurrentCatalog(), dbName, newDB); } - private Operation convertAlterDatabaseOwner(HiveParserASTNode ast) { + private Operation convertAlterDatabaseOwner(HiveParserASTNode ast) throws SemanticException { String dbName = HiveParserBaseSemanticAnalyzer.getUnescapedName( (HiveParserASTNode) ast.getChild(0)); @@ -1278,7 +1295,7 @@ private Operation convertAlterDatabaseOwner(HiveParserASTNode ast) { return new AlterDatabaseOperation(catalogManager.getCurrentCatalog(), dbName, newDB); } - private Operation convertAlterDatabaseLocation(HiveParserASTNode ast) { + private Operation convertAlterDatabaseLocation(HiveParserASTNode ast) throws SemanticException { String dbName = HiveParserBaseSemanticAnalyzer.getUnescapedName( (HiveParserASTNode) ast.getChild(0)); @@ -1363,7 +1380,8 @@ private Operation convertSwitchDatabase(HiveParserASTNode ast) { return new UseDatabaseOperation(catalogManager.getCurrentCatalog(), dbName); } - private Operation convertDropTable(HiveParserASTNode ast, TableType expectedType) { + private Operation convertDropTable(HiveParserASTNode ast, TableType expectedType) + throws SemanticException { String tableName = HiveParserBaseSemanticAnalyzer.getUnescapedName( (HiveParserASTNode) ast.getChild(0)); @@ -1681,9 +1699,15 @@ private Operation convertDescribeTable(HiveParserASTNode ast) { tableNode = (HiveParserASTNode) tableTypeExpr.getChild(0); if (tableNode.getChildCount() == 1) { tableName = tableNode.getChild(0).getText(); - } else { + } else if (tableNode.getChildCount() == 2) { dbName = tableNode.getChild(0).getText(); tableName = dbName + "." + tableNode.getChild(1).getText(); + } else { + // tablemname is CATALOGNAME.DBNAME.TABLENAME, which is not supported yet. + // todo: fix it in FLINK-29343 + throw new ValidationException( + "Describe a table in specific catalog is not supported in HiveDialect," + + " please switch to Flink default dialect."); } } else { throw new ValidationException( @@ -1752,7 +1776,7 @@ private List> getPartitionSpecs(CommonTree ast) { return partSpecs; } - private Operation convertShowPartitions(HiveParserASTNode ast) { + private Operation convertShowPartitions(HiveParserASTNode ast) throws SemanticException { String tableName = HiveParserBaseSemanticAnalyzer.getUnescapedName( (HiveParserASTNode) ast.getChild(0)); @@ -1834,9 +1858,11 @@ private Operation convertShowFunctions(HiveParserASTNode ast) { private Operation convertAlterTableRename( String sourceName, HiveParserASTNode ast, boolean expectView) throws SemanticException { + ObjectIdentifier tableIdentifier = + HiveParserBaseSemanticAnalyzer.getObjectIdentifier( + catalogManager, (HiveParserASTNode) ast.getChild(0)); String[] target = - HiveParserBaseSemanticAnalyzer.getQualifiedTableName( - (HiveParserASTNode) ast.getChild(0)); + new String[] {tableIdentifier.getDatabaseName(), tableIdentifier.getObjectName()}; String targetName = HiveParserBaseSemanticAnalyzer.getDotName(target); ObjectIdentifier objectIdentifier = parseObjectIdentifier(sourceName); diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserLoadSemanticAnalyzer.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserLoadSemanticAnalyzer.java index 48913291703a9..cf57cff2c92a2 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserLoadSemanticAnalyzer.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserLoadSemanticAnalyzer.java @@ -18,7 +18,10 @@ package org.apache.flink.table.planner.delegation.hive.parse; +import org.apache.flink.connectors.hive.FlinkHiveException; +import org.apache.flink.table.catalog.CatalogManager; import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.hive.HiveCatalog; import org.apache.flink.table.planner.delegation.hive.copy.HiveParserASTNode; import org.apache.flink.table.planner.delegation.hive.copy.HiveParserBaseSemanticAnalyzer.TableSpec; import org.apache.flink.table.planner.delegation.hive.operations.HiveLoadDataOperation; @@ -39,6 +42,8 @@ import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.metadata.Partition; +import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.parse.EximUtil; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.mapred.InputFormat; @@ -60,9 +65,13 @@ public class HiveParserLoadSemanticAnalyzer { private final Hive db; private final FrameworkConfig frameworkConfig; private final RelOptCluster cluster; + private final CatalogManager catalogManager; public HiveParserLoadSemanticAnalyzer( - HiveConf conf, FrameworkConfig frameworkConfig, RelOptCluster cluster) + HiveConf conf, + FrameworkConfig frameworkConfig, + RelOptCluster cluster, + CatalogManager catalogManager) throws SemanticException { this.conf = conf; try { @@ -72,6 +81,7 @@ public HiveParserLoadSemanticAnalyzer( } this.frameworkConfig = frameworkConfig; this.cluster = cluster; + this.catalogManager = catalogManager; } public HiveLoadDataOperation convertToOperation(HiveParserASTNode ast) @@ -104,26 +114,49 @@ public HiveLoadDataOperation convertToOperation(HiveParserASTNode ast) } // initialize destination table/partition - TableSpec ts = new TableSpec(db, conf, tableTree, frameworkConfig, cluster); + TableSpec ts = new TableSpec(catalogManager, conf, tableTree, frameworkConfig, cluster); + if (!HiveCatalog.isHiveTable(ts.table.getOptions())) { + throw new UnsupportedOperationException( + "Load data into non-hive table is not supported yet."); + } + if (!ts.tableIdentifier.getCatalogName().equals(catalogManager.getCurrentCatalog())) { + throw new UnsupportedOperationException( + String.format( + "Load data into a table which isn't in current catalog is not supported yet." + + " The table's catalog is %s, but the current catalog is %s.", + ts.tableIdentifier.getCatalogName(), + catalogManager.getCurrentCatalog())); + } + Table table; + try { + table = + db.getTable( + ts.tableIdentifier.getDatabaseName(), + ts.tableIdentifier.getObjectName()); + } catch (HiveException e) { + throw new FlinkHiveException( + String.format("Fail to get table %s.", ts.tableIdentifier.asSummaryString()), + e); + } - if (ts.tableHandle.isView() || ts.tableHandle.isMaterializedView()) { + if (table.isView() || table.isMaterializedView()) { throw new SemanticException(ErrorMsg.DML_AGAINST_VIEW.getMsg()); } - if (ts.tableHandle.isNonNative()) { + if (table.isNonNative()) { throw new SemanticException(ErrorMsg.LOAD_INTO_NON_NATIVE.getMsg()); } - if (ts.tableHandle.isStoredAsSubDirectories()) { + if (table.isStoredAsSubDirectories()) { throw new SemanticException(ErrorMsg.LOAD_INTO_STORED_AS_DIR.getMsg()); } - List parts = ts.tableHandle.getPartitionKeys(); + List parts = table.getPartitionKeys(); if ((parts != null && parts.size() > 0) && (ts.partSpec == null || ts.partSpec.size() == 0)) { throw new SemanticException(ErrorMsg.NEED_PARTITION_ERROR.getMsg()); } - List bucketCols = ts.tableHandle.getBucketCols(); + List bucketCols = table.getBucketCols(); if (bucketCols != null && !bucketCols.isEmpty()) { String error = HiveConf.StrictChecks.checkBucketing(conf); if (error != null) { @@ -139,14 +172,14 @@ public HiveLoadDataOperation convertToOperation(HiveParserASTNode ast) List files = applyConstraintsAndGetFiles(fromURI, fromTree, isLocal); // for managed tables, make sure the file formats match - if (TableType.MANAGED_TABLE.equals(ts.tableHandle.getTableType()) + if (TableType.MANAGED_TABLE.equals(table.getTableType()) && conf.getBoolVar(HiveConf.ConfVars.HIVECHECKFILEFORMAT)) { - ensureFileFormatsMatch(ts, files, fromURI); + ensureFileFormatsMatch(ts, table, files, fromURI); } return new HiveLoadDataOperation( new Path(fromURI), - new ObjectPath(ts.tableHandle.getDbName(), ts.tableHandle.getTableName()), + new ObjectPath(table.getDbName(), table.getTableName()), isOverWrite, isLocal, ts.partSpec == null ? new LinkedHashMap<>() : ts.partSpec); @@ -266,14 +299,18 @@ private URI initializeFromURI(String fromPath, boolean isLocal) } private void ensureFileFormatsMatch( - TableSpec ts, List fileStatuses, final URI fromURI) + TableSpec ts, Table table, List fileStatuses, final URI fromURI) throws SemanticException { final Class destInputFormat; try { if (ts.getPartSpec() == null || ts.getPartSpec().isEmpty()) { - destInputFormat = ts.tableHandle.getInputFormatClass(); + destInputFormat = table.getInputFormatClass(); } else { - destInputFormat = ts.partHandle.getInputFormatClass(); + Partition partition = db.getPartition(table, ts.partSpec, false); + if (partition == null) { + partition = new Partition(table, ts.partSpec, null); + } + destInputFormat = partition.getInputFormatClass(); } } catch (HiveException e) { throw new SemanticException(e); diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectQueryITCase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectQueryITCase.java index 8b8492882dfd1..9d7a87eb3e03b 100644 --- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectQueryITCase.java +++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectQueryITCase.java @@ -21,6 +21,8 @@ import org.apache.flink.table.HiveVersionTestUtil; import org.apache.flink.table.api.SqlDialect; import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.catalog.Catalog; +import org.apache.flink.table.catalog.GenericInMemoryCatalog; import org.apache.flink.table.catalog.hive.HiveCatalog; import org.apache.flink.table.catalog.hive.HiveTestUtils; import org.apache.flink.table.catalog.hive.client.HiveShim; @@ -45,8 +47,10 @@ import org.apache.hadoop.hive.serde2.objectinspector.primitive.TimestampObjectInspector; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; import org.junit.BeforeClass; +import org.junit.ClassRule; import org.junit.ComparisonFailure; import org.junit.Test; +import org.junit.rules.TemporaryFolder; import java.io.BufferedReader; import java.io.File; @@ -68,6 +72,8 @@ /** Test hive query compatibility. */ public class HiveDialectQueryITCase { + @ClassRule public static TemporaryFolder tempFolder = new TemporaryFolder(); + private static final String QTEST_DIR = Thread.currentThread().getContextClassLoader().getResource("query-test").getPath(); private static final String SORT_QUERY_RESULTS = "SORT_QUERY_RESULTS"; @@ -846,6 +852,65 @@ public void testCount() throws Exception { } } + @Test + public void testCrossCatalogQueryNoHiveTable() throws Exception { + // register a new in-memory catalog + Catalog inMemoryCatalog = new GenericInMemoryCatalog("m_catalog", "db"); + tableEnv.registerCatalog("m_catalog", inMemoryCatalog); + tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT); + // create a non-hive table + tableEnv.executeSql( + String.format( + "create table m_catalog.db.t1(x int, y string) " + + "with ('connector' = 'filesystem', 'path' = '%s', 'format'='csv')", + tempFolder.newFolder().toURI())); + // create a non-hive partitioned table + tableEnv.executeSql( + String.format( + "create table m_catalog.db.t2(x int, p1 int,p2 string) partitioned by (p1, p2) " + + "with ('connector' = 'filesystem', 'path' = '%s', 'format'='csv')", + tempFolder.newFolder().toURI())); + + tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE); + // create a hive table + tableEnv.executeSql("create table t1(x int, y string)"); + + try { + // insert data into the non-hive table and hive table + tableEnv.executeSql("insert into m_catalog.db.t1 values (1, 'v1'), (2, 'v2')").await(); + tableEnv.executeSql( + "insert into m_catalog.db.t2 partition (p1=0,p2='static') values (1), (2), (1)") + .await(); + tableEnv.executeSql("insert into t1 values (1, 'h1'), (4, 'h2')").await(); + // query a non-hive table + List result = + CollectionUtil.iteratorToList( + tableEnv.executeSql("select * from m_catalog.db.t1 sort by x desc") + .collect()); + assertThat(result.toString()).isEqualTo("[+I[2, v2], +I[1, v1]]"); + // query a non-hive partitioned table + result = + CollectionUtil.iteratorToList( + tableEnv.executeSql("select * from m_catalog.db.t2 cluster by x") + .collect()); + assertThat(result.toString()) + .isEqualTo("[+I[1, 0, static], +I[1, 0, static], +I[2, 0, static]]"); + // join a table using a hive table and a non-hive table + result = + CollectionUtil.iteratorToList( + tableEnv.executeSql( + "select ht1.x, ht1.y from m_catalog.db.t1 as mt1 join t1 as ht1 using (x)") + .collect()); + assertThat(result.toString()).isEqualTo("[+I[1, h1]]"); + } finally { + tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT); + tableEnv.executeSql("drop table m_catalog.db.t1"); + tableEnv.executeSql("drop table m_catalog.db.t2"); + tableEnv.executeSql("drop table t1"); + tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE); + } + } + private void runQFile(File qfile) throws Exception { QTest qTest = extractQTest(qfile); for (int i = 0; i < qTest.statements.size(); i++) {