Skip to content

Commit

Permalink
[HUDI-4326] Fix hive sync serde properties (apache#6722)
Browse files Browse the repository at this point in the history
  • Loading branch information
xushiyan authored and voonhous committed Oct 7, 2022
1 parent 13dce4c commit fe8c6d4
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -290,9 +290,7 @@ private boolean syncSchema(String tableName, boolean tableExists, boolean useRea
// Sync the table properties if the schema has changed
if (config.getString(HIVE_TABLE_PROPERTIES) != null || config.getBoolean(HIVE_SYNC_AS_DATA_SOURCE_TABLE)) {
syncClient.updateTableProperties(tableName, tableProperties);
HoodieFileFormat baseFileFormat = HoodieFileFormat.valueOf(config.getStringOrDefault(META_SYNC_BASE_FILE_FORMAT).toUpperCase());
String serDeFormatClassName = HoodieInputFormatUtils.getSerDeClassName(baseFileFormat);
syncClient.updateTableSerDeInfo(tableName, serDeFormatClassName, serdeProperties);
syncClient.updateSerdeProperties(tableName, serdeProperties);
LOG.info("Sync table properties for " + tableName + ", table properties is: " + tableProperties);
}
schemaChanged = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,14 @@

package org.apache.hudi.hive;

import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.MapUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
import org.apache.hudi.hive.ddl.DDLExecutor;
import org.apache.hudi.hive.ddl.HMSDDLExecutor;
import org.apache.hudi.hive.ddl.HiveQueryDDLExecutor;
Expand Down Expand Up @@ -52,6 +55,7 @@
import static org.apache.hudi.hadoop.utils.HoodieHiveUtils.GLOBALLY_CONSISTENT_READ_TIMESTAMP;
import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SYNC_MODE;
import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_USE_JDBC;
import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_BASE_FILE_FORMAT;
import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_DATABASE_NAME;
import static org.apache.hudi.sync.common.util.TableUtils.tableId;

Expand Down Expand Up @@ -115,7 +119,7 @@ public void dropPartitions(String tableName, List<String> partitionsToDrop) {

@Override
public void updateTableProperties(String tableName, Map<String, String> tableProperties) {
if (tableProperties == null || tableProperties.isEmpty()) {
if (MapUtils.isNullOrEmpty(tableProperties)) {
return;
}
try {
Expand All @@ -130,34 +134,29 @@ public void updateTableProperties(String tableName, Map<String, String> tablePro
}
}

/**
* Update the table serde properties to the table.
*/
@Override
public void updateTableSerDeInfo(String tableName, String serdeClass, Map<String, String> serdeProperties) {
if (serdeProperties == null || serdeProperties.isEmpty()) {
public void updateSerdeProperties(String tableName, Map<String, String> serdeProperties) {
if (MapUtils.isNullOrEmpty(serdeProperties)) {
return;
}
try {
serdeProperties.putIfAbsent("serialization.format", "1");
Table table = client.getTable(databaseName, tableName);
serdeProperties.put("serialization.format", "1");
StorageDescriptor storageDescriptor = table.getSd();
SerDeInfo serdeInfo = storageDescriptor.getSerdeInfo();
if (serdeInfo != null && serdeInfo.getParametersSize() == serdeProperties.size()) {
Map<String, String> parameters = serdeInfo.getParameters();
boolean same = true;
for (String key : serdeProperties.keySet()) {
if (!parameters.containsKey(key) | !parameters.get(key).equals(serdeProperties.get(key))) {
same = false;
break;
}
}
if (same) {
LOG.debug("Table " + tableName + " serdeProperties already up to date, skip update");
boolean different = serdeProperties.entrySet().stream().anyMatch(e ->
!parameters.containsKey(e.getKey()) || !parameters.get(e.getKey()).equals(e.getValue()));
if (!different) {
LOG.debug("Table " + tableName + " serdeProperties already up to date, skip update serde properties.");
return;
}
}
storageDescriptor.setSerdeInfo(new SerDeInfo(null, serdeClass, serdeProperties));

HoodieFileFormat baseFileFormat = HoodieFileFormat.valueOf(config.getStringOrDefault(META_SYNC_BASE_FILE_FORMAT).toUpperCase());
String serDeClassName = HoodieInputFormatUtils.getSerDeClassName(baseFileFormat);
storageDescriptor.setSerdeInfo(new SerDeInfo(null, serDeClassName, serdeProperties));
client.alter_table(databaseName, tableName, table);
} catch (Exception e) {
throw new HoodieHiveSyncException("Failed to update table serde info for table: "
Expand Down Expand Up @@ -353,11 +352,4 @@ public void updateTableComments(String tableName, List<FieldSchema> fromMetastor
}
}

Table getTable(String tableName) {
try {
return client.getTable(databaseName, tableName);
} catch (TException e) {
throw new HoodieHiveSyncException(String.format("Database: %s, Table: %s does not exist", databaseName, tableName), e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.avro.Schema.Field;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.Driver;
import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
Expand Down Expand Up @@ -159,9 +160,6 @@ public void testBasicSync(boolean useSchemaFromCommitMetadata, String syncMode)

assertTrue(hiveClient.tableExists(HiveTestUtil.TABLE_NAME),
"Table " + HiveTestUtil.TABLE_NAME + " should exist after sync completes");
assertEquals("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe",
hiveClient.getTable(HiveTestUtil.TABLE_NAME).getSd().getSerdeInfo().getSerializationLib(),
"SerDe info not updated or does not match");
assertEquals(hiveClient.getMetastoreSchema(HiveTestUtil.TABLE_NAME).size(),
hiveClient.getStorageSchema().getColumns().size() + 1,
"Hive Schema should match the table schema + partition field");
Expand Down Expand Up @@ -303,6 +301,7 @@ public void testSyncCOWTableWithProperties(boolean useSchemaFromCommitMetadata,
hiveDriver.run("SHOW CREATE TABLE " + dbTableName);
hiveDriver.getResults(results);
String ddl = String.join("\n", results);
assertTrue(ddl.contains(String.format("ROW FORMAT SERDE \n '%s'", ParquetHiveSerDe.class.getName())));
assertTrue(ddl.contains("'path'='" + HiveTestUtil.basePath + "'"));
if (syncAsDataSourceTable) {
assertTrue(ddl.contains("'" + ConfigUtils.IS_QUERY_AS_RO_TABLE + "'='false'"));
Expand Down Expand Up @@ -405,6 +404,7 @@ public void testSyncMORTableWithProperties(boolean useSchemaFromCommitMetadata,
hiveDriver.run("SHOW CREATE TABLE " + dbTableName);
hiveDriver.getResults(results);
String ddl = String.join("\n", results);
assertTrue(ddl.contains(String.format("ROW FORMAT SERDE \n '%s'", ParquetHiveSerDe.class.getName())));
assertTrue(ddl.contains("'path'='" + HiveTestUtil.basePath + "'"));
assertTrue(ddl.toLowerCase().contains("create external table"));
if (syncAsDataSourceTable) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,9 +174,10 @@ default void updateTableProperties(String tableName, Map<String, String> tablePr
}

/**
* Update the table SerDeInfo in metastore.
* Update the SerDe properties in metastore.
*/
default void updateTableSerDeInfo(String tableName, String serdeClass, Map<String, String> serdeProperties) {
default void updateSerdeProperties(String tableName, Map<String, String> serdeProperties) {

}

/**
Expand Down

0 comments on commit fe8c6d4

Please sign in to comment.