Skip to content

Commit

Permalink
[HUDI-5088]Fix bug:Failed to synchronize the hive metadata of the Fli…
Browse files Browse the repository at this point in the history
…nk table (apache#7056)

* sync `_hoodie_operation` meta field if changelog mode is enabled.
  • Loading branch information
waywtdcc authored and fengjian committed Apr 5, 2023
1 parent 73cc4b8 commit 6b0655b
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.hadoop.hive.serde2.typeinfo.VarcharTypeInfo;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -178,9 +179,12 @@ private static DataType toFlinkPrimitiveType(PrimitiveTypeInfo hiveType) {
/**
* Create Hive field schemas from Flink table schema including the hoodie metadata fields.
*/
public static List<FieldSchema> toHiveFieldSchema(TableSchema schema) {
public static List<FieldSchema> toHiveFieldSchema(TableSchema schema, boolean withOperationField) {
List<FieldSchema> columns = new ArrayList<>();
for (String metaField : HoodieRecord.HOODIE_META_COLUMNS) {
Collection<String> metaFields = withOperationField
? HoodieRecord.HOODIE_META_COLUMNS_WITH_OPERATION // caution that the set may break sequence
: HoodieRecord.HOODIE_META_COLUMNS;
for (String metaField : metaFields) {
columns.add(new FieldSchema(metaField, "string", null));
}
columns.addAll(createHiveColumns(schema));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -553,7 +553,8 @@ private Table instantiateHiveTable(ObjectPath tablePath, CatalogBaseTable table,
// because since Hive 3.x, there is validation when altering table,
// when the metadata fields are synced through the hive sync tool,
// a compatability issue would be reported.
List<FieldSchema> allColumns = HiveSchemaUtils.toHiveFieldSchema(table.getSchema());
boolean withOperationField = Boolean.parseBoolean(table.getOptions().getOrDefault(FlinkOptions.CHANGELOG_ENABLED.key(), "false"));
List<FieldSchema> allColumns = HiveSchemaUtils.toHiveFieldSchema(table.getSchema(), withOperationField);

// Table columns and partition keys
CatalogTable catalogTable = (CatalogTable) table;
Expand Down

0 comments on commit 6b0655b

Please sign in to comment.