Skip to content

Commit

Permalink
[HUDI-4740] Add metadata fields for hive catalog #createTable (apache…
Browse files Browse the repository at this point in the history
  • Loading branch information
danny0405 authored and fengjian committed Apr 5, 2023
1 parent e2a4e04 commit 7206ed1
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 8 deletions.
Expand Up @@ -18,6 +18,7 @@

package org.apache.hudi.table.catalog;

import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.configuration.FlinkOptions;
Expand Down Expand Up @@ -55,7 +56,11 @@ public static List<String> getFieldNames(List<FieldSchema> fieldSchemas) {
}

public static org.apache.flink.table.api.Schema convertTableSchema(Table hiveTable) {
List<FieldSchema> allCols = new ArrayList<>(hiveTable.getSd().getCols());
List<FieldSchema> allCols = hiveTable.getSd().getCols().stream()
// filter out the metadata columns
.filter(s -> !HoodieRecord.HOODIE_META_COLUMNS_NAME_TO_POS.containsKey(s.getName()))
.collect(Collectors.toList());
// need to refactor the partition key field positions: they are not always in the last
allCols.addAll(hiveTable.getPartitionKeys());

String pkConstraintName = hiveTable.getParameters().get(TableOptionProperties.PK_CONSTRAINT_NAME);
Expand Down Expand Up @@ -167,8 +172,18 @@ private static DataType toFlinkPrimitiveType(PrimitiveTypeInfo hiveType) {
}
}

/** Create Hive columns from Flink TableSchema. */
public static List<FieldSchema> createHiveColumns(TableSchema schema) {
/** Create Hive field schemas from Flink table schema including the hoodie metadata fields. */
public static List<FieldSchema> toHiveFieldSchema(TableSchema schema) {
List<FieldSchema> columns = new ArrayList<>();
for (String metaField : HoodieRecord.HOODIE_META_COLUMNS) {
columns.add(new FieldSchema(metaField, "string", null));
}
columns.addAll(createHiveColumns(schema));
return columns;
}

/** Create Hive columns from Flink table schema. */
private static List<FieldSchema> createHiveColumns(TableSchema schema) {
final DataType dataType = schema.toPersistedRowDataType();
final RowType rowType = (RowType) dataType.getLogicalType();
final String[] fieldNames = rowType.getFieldNames().toArray(new String[0]);
Expand Down
Expand Up @@ -536,7 +536,11 @@ private Table instantiateHiveTable(ObjectPath tablePath, CatalogBaseTable table,

//set sd
StorageDescriptor sd = new StorageDescriptor();
List<FieldSchema> allColumns = HiveSchemaUtils.createHiveColumns(table.getSchema());
// the metadata fields should be included to keep sync with the hive sync tool,
// because since Hive 3.x, there is validation when altering table,
// when the metadata fields are synced through the hive sync tool,
// a compatability issue would be reported.
List<FieldSchema> allColumns = HiveSchemaUtils.toHiveFieldSchema(table.getSchema());

// Table columns and partition keys
CatalogTable catalogTable = (CatalogTable) table;
Expand Down Expand Up @@ -893,9 +897,9 @@ private Map<String, String> supplementOptions(
} else {
Map<String, String> newOptions = new HashMap<>(options);
// set up hive sync options
newOptions.put(FlinkOptions.HIVE_SYNC_ENABLED.key(), "true");
newOptions.put(FlinkOptions.HIVE_SYNC_METASTORE_URIS.key(), hiveConf.getVar(HiveConf.ConfVars.METASTOREURIS));
newOptions.put(FlinkOptions.HIVE_SYNC_MODE.key(), "hms");
newOptions.putIfAbsent(FlinkOptions.HIVE_SYNC_ENABLED.key(), "true");
newOptions.putIfAbsent(FlinkOptions.HIVE_SYNC_METASTORE_URIS.key(), hiveConf.getVar(HiveConf.ConfVars.METASTOREURIS));
newOptions.putIfAbsent(FlinkOptions.HIVE_SYNC_MODE.key(), "hms");
newOptions.putIfAbsent(FlinkOptions.HIVE_SYNC_SUPPORT_TIMESTAMP.key(), "true");
newOptions.computeIfAbsent(FlinkOptions.HIVE_SYNC_DB.key(), k -> tablePath.getDatabaseName());
newOptions.computeIfAbsent(FlinkOptions.HIVE_SYNC_TABLE.key(), k -> tablePath.getObjectName());
Expand Down
Expand Up @@ -23,6 +23,7 @@
import org.apache.hudi.exception.HoodieCatalogException;

import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.catalog.CatalogBaseTable;
import org.apache.flink.table.catalog.CatalogTable;
Expand Down Expand Up @@ -50,6 +51,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR;
import static org.junit.jupiter.api.Assertions.assertEquals;
Expand Down Expand Up @@ -103,15 +105,42 @@ public void testCreateAndGetHoodieTable(HoodieTableType tableType) throws Except
new CatalogTableImpl(schema, partitions, options, "hudi table");
hoodieCatalog.createTable(tablePath, table, false);

// validate hive table
Table hiveTable = hoodieCatalog.getHiveTable(tablePath);
String fieldSchema = hiveTable.getSd().getCols().stream()
.map(f -> f.getName() + ":" + f.getType())
.collect(Collectors.joining(","));
String expectedFieldSchema = ""
+ "_hoodie_commit_time:string,"
+ "_hoodie_commit_seqno:string,"
+ "_hoodie_record_key:string,"
+ "_hoodie_partition_path:string,"
+ "_hoodie_file_name:string,"
+ "uuid:int,"
+ "name:string,"
+ "age:int,"
+ "ts:bigint";
assertEquals(expectedFieldSchema, fieldSchema);
String partitionSchema = hiveTable.getPartitionKeys().stream()
.map(f -> f.getName() + ":" + f.getType())
.collect(Collectors.joining(","));
assertEquals("par1:string", partitionSchema);

// validate catalog table
CatalogBaseTable table1 = hoodieCatalog.getTable(tablePath);
assertEquals("hudi", table1.getOptions().get(CONNECTOR.key()));
assertEquals(tableType.toString(), table1.getOptions().get(FlinkOptions.TABLE_TYPE.key()));
assertEquals("uuid", table1.getOptions().get(FlinkOptions.RECORD_KEY_FIELD.key()));
assertNull(table1.getOptions().get(FlinkOptions.PRECOMBINE_FIELD.key()), "preCombine key is not declared");
String tableSchema = table1.getUnresolvedSchema().getColumns().stream()
.map(Schema.UnresolvedColumn::toString)
.collect(Collectors.joining(","));
String expectedTableSchema = "`uuid` INT NOT NULL,`name` STRING,`age` INT,`par1` STRING,`ts` BIGINT";
assertEquals(expectedTableSchema, tableSchema);
assertEquals(Collections.singletonList("uuid"), table1.getUnresolvedSchema().getPrimaryKey().get().getColumnNames());
assertEquals(Collections.singletonList("par1"), ((CatalogTable)table1).getPartitionKeys());

// test explicit primary key
// validate explicit primary key
options.put(FlinkOptions.RECORD_KEY_FIELD.key(), "id");
table = new CatalogTableImpl(schema, partitions, options, "hudi table");
hoodieCatalog.alterTable(tablePath, table, true);
Expand Down

0 comments on commit 7206ed1

Please sign in to comment.