From fe8c6d4f4ce395e649ecc886d30c35a2bce858c4 Mon Sep 17 00:00:00 2001 From: Shiyan Xu <2701446+xushiyan@users.noreply.github.com> Date: Tue, 20 Sep 2022 20:22:30 +0800 Subject: [PATCH] [HUDI-4326] Fix hive sync serde properties (#6722) --- .../org/apache/hudi/hive/HiveSyncTool.java | 4 +- .../hudi/hive/HoodieHiveSyncClient.java | 40 ++++++++----------- .../apache/hudi/hive/TestHiveSyncTool.java | 6 +-- .../sync/common/HoodieMetaSyncOperations.java | 5 ++- 4 files changed, 23 insertions(+), 32 deletions(-) diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java index ce3114b92e008..d0a40bbc181c5 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java @@ -290,9 +290,7 @@ private boolean syncSchema(String tableName, boolean tableExists, boolean useRea // Sync the table properties if the schema has changed if (config.getString(HIVE_TABLE_PROPERTIES) != null || config.getBoolean(HIVE_SYNC_AS_DATA_SOURCE_TABLE)) { syncClient.updateTableProperties(tableName, tableProperties); - HoodieFileFormat baseFileFormat = HoodieFileFormat.valueOf(config.getStringOrDefault(META_SYNC_BASE_FILE_FORMAT).toUpperCase()); - String serDeFormatClassName = HoodieInputFormatUtils.getSerDeClassName(baseFileFormat); - syncClient.updateTableSerDeInfo(tableName, serDeFormatClassName, serdeProperties); + syncClient.updateSerdeProperties(tableName, serdeProperties); LOG.info("Sync table properties for " + tableName + ", table properties is: " + tableProperties); } schemaChanged = true; diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java index a740c93d65af1..1bdc87ab1123d 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java @@ -18,11 +18,14 @@ package org.apache.hudi.hive; +import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.common.table.TableSchemaResolver; import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.util.MapUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils; import org.apache.hudi.hive.ddl.DDLExecutor; import org.apache.hudi.hive.ddl.HMSDDLExecutor; import org.apache.hudi.hive.ddl.HiveQueryDDLExecutor; @@ -52,6 +55,7 @@ import static org.apache.hudi.hadoop.utils.HoodieHiveUtils.GLOBALLY_CONSISTENT_READ_TIMESTAMP; import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SYNC_MODE; import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_USE_JDBC; +import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_BASE_FILE_FORMAT; import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_DATABASE_NAME; import static org.apache.hudi.sync.common.util.TableUtils.tableId; @@ -115,7 +119,7 @@ public void dropPartitions(String tableName, List partitionsToDrop) { @Override public void updateTableProperties(String tableName, Map tableProperties) { - if (tableProperties == null || tableProperties.isEmpty()) { + if (MapUtils.isNullOrEmpty(tableProperties)) { return; } try { @@ -130,34 +134,29 @@ public void updateTableProperties(String tableName, Map tablePro } } - /** - * Update the table serde properties to the table. - */ @Override - public void updateTableSerDeInfo(String tableName, String serdeClass, Map serdeProperties) { - if (serdeProperties == null || serdeProperties.isEmpty()) { + public void updateSerdeProperties(String tableName, Map serdeProperties) { + if (MapUtils.isNullOrEmpty(serdeProperties)) { return; } try { + serdeProperties.putIfAbsent("serialization.format", "1"); Table table = client.getTable(databaseName, tableName); - serdeProperties.put("serialization.format", "1"); StorageDescriptor storageDescriptor = table.getSd(); SerDeInfo serdeInfo = storageDescriptor.getSerdeInfo(); if (serdeInfo != null && serdeInfo.getParametersSize() == serdeProperties.size()) { Map parameters = serdeInfo.getParameters(); - boolean same = true; - for (String key : serdeProperties.keySet()) { - if (!parameters.containsKey(key) | !parameters.get(key).equals(serdeProperties.get(key))) { - same = false; - break; - } - } - if (same) { - LOG.debug("Table " + tableName + " serdeProperties already up to date, skip update"); + boolean different = serdeProperties.entrySet().stream().anyMatch(e -> + !parameters.containsKey(e.getKey()) || !parameters.get(e.getKey()).equals(e.getValue())); + if (!different) { + LOG.debug("Table " + tableName + " serdeProperties already up to date, skip update serde properties."); return; } } - storageDescriptor.setSerdeInfo(new SerDeInfo(null, serdeClass, serdeProperties)); + + HoodieFileFormat baseFileFormat = HoodieFileFormat.valueOf(config.getStringOrDefault(META_SYNC_BASE_FILE_FORMAT).toUpperCase()); + String serDeClassName = HoodieInputFormatUtils.getSerDeClassName(baseFileFormat); + storageDescriptor.setSerdeInfo(new SerDeInfo(null, serDeClassName, serdeProperties)); client.alter_table(databaseName, tableName, table); } catch (Exception e) { throw new HoodieHiveSyncException("Failed to update table serde info for table: " @@ -353,11 +352,4 @@ public void updateTableComments(String tableName, List fromMetastor } } - Table getTable(String tableName) { - try { - return client.getTable(databaseName, tableName); - } catch (TException e) { - throw new HoodieHiveSyncException(String.format("Database: %s, Table: %s does not exist", databaseName, tableName), e); - } - } } diff --git a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java index ba9d33a662c21..acd75595fb887 100644 --- a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java +++ b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java @@ -39,6 +39,7 @@ import org.apache.avro.Schema.Field; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.Driver; +import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe; import org.apache.hadoop.hive.ql.session.SessionState; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterEach; @@ -159,9 +160,6 @@ public void testBasicSync(boolean useSchemaFromCommitMetadata, String syncMode) assertTrue(hiveClient.tableExists(HiveTestUtil.TABLE_NAME), "Table " + HiveTestUtil.TABLE_NAME + " should exist after sync completes"); - assertEquals("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe", - hiveClient.getTable(HiveTestUtil.TABLE_NAME).getSd().getSerdeInfo().getSerializationLib(), - "SerDe info not updated or does not match"); assertEquals(hiveClient.getMetastoreSchema(HiveTestUtil.TABLE_NAME).size(), hiveClient.getStorageSchema().getColumns().size() + 1, "Hive Schema should match the table schema + partition field"); @@ -303,6 +301,7 @@ public void testSyncCOWTableWithProperties(boolean useSchemaFromCommitMetadata, hiveDriver.run("SHOW CREATE TABLE " + dbTableName); hiveDriver.getResults(results); String ddl = String.join("\n", results); + assertTrue(ddl.contains(String.format("ROW FORMAT SERDE \n '%s'", ParquetHiveSerDe.class.getName()))); assertTrue(ddl.contains("'path'='" + HiveTestUtil.basePath + "'")); if (syncAsDataSourceTable) { assertTrue(ddl.contains("'" + ConfigUtils.IS_QUERY_AS_RO_TABLE + "'='false'")); @@ -405,6 +404,7 @@ public void testSyncMORTableWithProperties(boolean useSchemaFromCommitMetadata, hiveDriver.run("SHOW CREATE TABLE " + dbTableName); hiveDriver.getResults(results); String ddl = String.join("\n", results); + assertTrue(ddl.contains(String.format("ROW FORMAT SERDE \n '%s'", ParquetHiveSerDe.class.getName()))); assertTrue(ddl.contains("'path'='" + HiveTestUtil.basePath + "'")); assertTrue(ddl.toLowerCase().contains("create external table")); if (syncAsDataSourceTable) { diff --git a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieMetaSyncOperations.java b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieMetaSyncOperations.java index 5afcf80a877e8..49edbffd454a2 100644 --- a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieMetaSyncOperations.java +++ b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieMetaSyncOperations.java @@ -174,9 +174,10 @@ default void updateTableProperties(String tableName, Map tablePr } /** - * Update the table SerDeInfo in metastore. + * Update the SerDe properties in metastore. */ - default void updateTableSerDeInfo(String tableName, String serdeClass, Map serdeProperties) { + default void updateSerdeProperties(String tableName, Map serdeProperties) { + } /**