From 6b0655b4a4cff5abbf5c92bb2c64aae5826b848d Mon Sep 17 00:00:00 2001 From: chao chen <59957056+waywtdcc@users.noreply.github.com> Date: Mon, 7 Nov 2022 10:13:57 +0800 Subject: [PATCH] [HUDI-5088]Fix bug:Failed to synchronize the hive metadata of the Flink table (#7056) * sync `_hoodie_operation` meta field if changelog mode is enabled. --- .../org/apache/hudi/table/catalog/HiveSchemaUtils.java | 8 ++++++-- .../org/apache/hudi/table/catalog/HoodieHiveCatalog.java | 3 ++- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HiveSchemaUtils.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HiveSchemaUtils.java index a057c02f2cca3..4383b42e9f8d9 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HiveSchemaUtils.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HiveSchemaUtils.java @@ -42,6 +42,7 @@ import org.apache.hadoop.hive.serde2.typeinfo.VarcharTypeInfo; import java.util.ArrayList; +import java.util.Collection; import java.util.List; import java.util.stream.Collectors; @@ -178,9 +179,12 @@ private static DataType toFlinkPrimitiveType(PrimitiveTypeInfo hiveType) { /** * Create Hive field schemas from Flink table schema including the hoodie metadata fields. */ - public static List toHiveFieldSchema(TableSchema schema) { + public static List toHiveFieldSchema(TableSchema schema, boolean withOperationField) { List columns = new ArrayList<>(); - for (String metaField : HoodieRecord.HOODIE_META_COLUMNS) { + Collection metaFields = withOperationField + ? HoodieRecord.HOODIE_META_COLUMNS_WITH_OPERATION // caution that the set may break sequence + : HoodieRecord.HOODIE_META_COLUMNS; + for (String metaField : metaFields) { columns.add(new FieldSchema(metaField, "string", null)); } columns.addAll(createHiveColumns(schema)); diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java index d6e70f16eaf7c..85f82d53d9102 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java @@ -553,7 +553,8 @@ private Table instantiateHiveTable(ObjectPath tablePath, CatalogBaseTable table, // because since Hive 3.x, there is validation when altering table, // when the metadata fields are synced through the hive sync tool, // a compatability issue would be reported. - List allColumns = HiveSchemaUtils.toHiveFieldSchema(table.getSchema()); + boolean withOperationField = Boolean.parseBoolean(table.getOptions().getOrDefault(FlinkOptions.CHANGELOG_ENABLED.key(), "false")); + List allColumns = HiveSchemaUtils.toHiveFieldSchema(table.getSchema(), withOperationField); // Table columns and partition keys CatalogTable catalogTable = (CatalogTable) table;