From 7206ed1163d532a05431e430e4050f46c29cd3b6 Mon Sep 17 00:00:00 2001 From: Danny Chan Date: Tue, 30 Aug 2022 16:34:38 +0800 Subject: [PATCH] [HUDI-4740] Add metadata fields for hive catalog #createTable (#6541) --- .../hudi/table/catalog/HiveSchemaUtils.java | 21 +++++++++++-- .../hudi/table/catalog/HoodieHiveCatalog.java | 12 ++++--- .../table/catalog/TestHoodieHiveCatalog.java | 31 ++++++++++++++++++- 3 files changed, 56 insertions(+), 8 deletions(-) diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HiveSchemaUtils.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HiveSchemaUtils.java index ea965f5c01e20..6c1d0e9ec240c 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HiveSchemaUtils.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HiveSchemaUtils.java @@ -18,6 +18,7 @@ package org.apache.hudi.table.catalog; +import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.configuration.FlinkOptions; @@ -55,7 +56,11 @@ public static List getFieldNames(List fieldSchemas) { } public static org.apache.flink.table.api.Schema convertTableSchema(Table hiveTable) { - List allCols = new ArrayList<>(hiveTable.getSd().getCols()); + List allCols = hiveTable.getSd().getCols().stream() + // filter out the metadata columns + .filter(s -> !HoodieRecord.HOODIE_META_COLUMNS_NAME_TO_POS.containsKey(s.getName())) + .collect(Collectors.toList()); + // need to refactor the partition key field positions: they are not always in the last allCols.addAll(hiveTable.getPartitionKeys()); String pkConstraintName = hiveTable.getParameters().get(TableOptionProperties.PK_CONSTRAINT_NAME); @@ -167,8 +172,18 @@ private static DataType toFlinkPrimitiveType(PrimitiveTypeInfo hiveType) { } } - /** Create Hive columns from Flink TableSchema. */ - public static List createHiveColumns(TableSchema schema) { + /** Create Hive field schemas from Flink table schema including the hoodie metadata fields. */ + public static List toHiveFieldSchema(TableSchema schema) { + List columns = new ArrayList<>(); + for (String metaField : HoodieRecord.HOODIE_META_COLUMNS) { + columns.add(new FieldSchema(metaField, "string", null)); + } + columns.addAll(createHiveColumns(schema)); + return columns; + } + + /** Create Hive columns from Flink table schema. */ + private static List createHiveColumns(TableSchema schema) { final DataType dataType = schema.toPersistedRowDataType(); final RowType rowType = (RowType) dataType.getLogicalType(); final String[] fieldNames = rowType.getFieldNames().toArray(new String[0]); diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java index 07f62911457ab..491958f31ebd3 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java @@ -536,7 +536,11 @@ private Table instantiateHiveTable(ObjectPath tablePath, CatalogBaseTable table, //set sd StorageDescriptor sd = new StorageDescriptor(); - List allColumns = HiveSchemaUtils.createHiveColumns(table.getSchema()); + // the metadata fields should be included to keep sync with the hive sync tool, + // because since Hive 3.x, there is validation when altering table, + // when the metadata fields are synced through the hive sync tool, + // a compatability issue would be reported. + List allColumns = HiveSchemaUtils.toHiveFieldSchema(table.getSchema()); // Table columns and partition keys CatalogTable catalogTable = (CatalogTable) table; @@ -893,9 +897,9 @@ private Map supplementOptions( } else { Map newOptions = new HashMap<>(options); // set up hive sync options - newOptions.put(FlinkOptions.HIVE_SYNC_ENABLED.key(), "true"); - newOptions.put(FlinkOptions.HIVE_SYNC_METASTORE_URIS.key(), hiveConf.getVar(HiveConf.ConfVars.METASTOREURIS)); - newOptions.put(FlinkOptions.HIVE_SYNC_MODE.key(), "hms"); + newOptions.putIfAbsent(FlinkOptions.HIVE_SYNC_ENABLED.key(), "true"); + newOptions.putIfAbsent(FlinkOptions.HIVE_SYNC_METASTORE_URIS.key(), hiveConf.getVar(HiveConf.ConfVars.METASTOREURIS)); + newOptions.putIfAbsent(FlinkOptions.HIVE_SYNC_MODE.key(), "hms"); newOptions.putIfAbsent(FlinkOptions.HIVE_SYNC_SUPPORT_TIMESTAMP.key(), "true"); newOptions.computeIfAbsent(FlinkOptions.HIVE_SYNC_DB.key(), k -> tablePath.getDatabaseName()); newOptions.computeIfAbsent(FlinkOptions.HIVE_SYNC_TABLE.key(), k -> tablePath.getObjectName()); diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieHiveCatalog.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieHiveCatalog.java index 66ba520af9587..2a731143a07b8 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieHiveCatalog.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieHiveCatalog.java @@ -23,6 +23,7 @@ import org.apache.hudi.exception.HoodieCatalogException; import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.Schema; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.catalog.CatalogBaseTable; import org.apache.flink.table.catalog.CatalogTable; @@ -50,6 +51,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -103,15 +105,42 @@ public void testCreateAndGetHoodieTable(HoodieTableType tableType) throws Except new CatalogTableImpl(schema, partitions, options, "hudi table"); hoodieCatalog.createTable(tablePath, table, false); + // validate hive table + Table hiveTable = hoodieCatalog.getHiveTable(tablePath); + String fieldSchema = hiveTable.getSd().getCols().stream() + .map(f -> f.getName() + ":" + f.getType()) + .collect(Collectors.joining(",")); + String expectedFieldSchema = "" + + "_hoodie_commit_time:string," + + "_hoodie_commit_seqno:string," + + "_hoodie_record_key:string," + + "_hoodie_partition_path:string," + + "_hoodie_file_name:string," + + "uuid:int," + + "name:string," + + "age:int," + + "ts:bigint"; + assertEquals(expectedFieldSchema, fieldSchema); + String partitionSchema = hiveTable.getPartitionKeys().stream() + .map(f -> f.getName() + ":" + f.getType()) + .collect(Collectors.joining(",")); + assertEquals("par1:string", partitionSchema); + + // validate catalog table CatalogBaseTable table1 = hoodieCatalog.getTable(tablePath); assertEquals("hudi", table1.getOptions().get(CONNECTOR.key())); assertEquals(tableType.toString(), table1.getOptions().get(FlinkOptions.TABLE_TYPE.key())); assertEquals("uuid", table1.getOptions().get(FlinkOptions.RECORD_KEY_FIELD.key())); assertNull(table1.getOptions().get(FlinkOptions.PRECOMBINE_FIELD.key()), "preCombine key is not declared"); + String tableSchema = table1.getUnresolvedSchema().getColumns().stream() + .map(Schema.UnresolvedColumn::toString) + .collect(Collectors.joining(",")); + String expectedTableSchema = "`uuid` INT NOT NULL,`name` STRING,`age` INT,`par1` STRING,`ts` BIGINT"; + assertEquals(expectedTableSchema, tableSchema); assertEquals(Collections.singletonList("uuid"), table1.getUnresolvedSchema().getPrimaryKey().get().getColumnNames()); assertEquals(Collections.singletonList("par1"), ((CatalogTable)table1).getPartitionKeys()); - // test explicit primary key + // validate explicit primary key options.put(FlinkOptions.RECORD_KEY_FIELD.key(), "id"); table = new CatalogTableImpl(schema, partitions, options, "hudi table"); hoodieCatalog.alterTable(tablePath, table, true);