diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java index d44bb4c3b1..f38f6f5acc 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java @@ -156,18 +156,6 @@ public class AmoroManagementConf { .defaultValue(1000000) .withDescription("The queue size of the executors of the external catalog explorer."); - public static final ConfigOption SYNC_HIVE_TABLES_ENABLED = - ConfigOptions.key("sync-hive-tables.enabled") - .booleanType() - .defaultValue(false) - .withDescription("Enable synchronizing Hive tables."); - - public static final ConfigOption SYNC_HIVE_TABLES_THREAD_COUNT = - ConfigOptions.key("sync-hive-tables.thread-count") - .intType() - .defaultValue(10) - .withDescription("The number of threads used for synchronizing Hive tables."); - public static final ConfigOption REFRESH_TABLES_THREAD_COUNT = ConfigOptions.key("refresh-tables.thread-count") .intType() diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConfValidator.java b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConfValidator.java index 7c767aa14b..c249f374cf 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConfValidator.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConfValidator.java @@ -81,10 +81,6 @@ public static void validateConfig(Configurations configurations) { validateThreadCount(configurations, AmoroManagementConf.REFRESH_TABLES_THREAD_COUNT); validateThreadCount(configurations, AmoroManagementConf.OPTIMIZING_COMMIT_THREAD_COUNT); - - if (configurations.getBoolean(AmoroManagementConf.SYNC_HIVE_TABLES_ENABLED)) { - validateThreadCount(configurations, AmoroManagementConf.SYNC_HIVE_TABLES_THREAD_COUNT); - } } private static void validateThreadCount( diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java index e579777add..6383549830 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java @@ -292,7 +292,6 @@ public void startOptimizingService() throws Exception { addHandlerChain(InlineTableExecutors.getInstance().getOptimizingCommitExecutor()); addHandlerChain(InlineTableExecutors.getInstance().getProcessDataExpiringExecutor()); addHandlerChain(InlineTableExecutors.getInstance().getBlockerExpiringExecutor()); - addHandlerChain(InlineTableExecutors.getInstance().getHiveCommitSyncExecutor()); addHandlerChain(InlineTableExecutors.getInstance().getTableRefreshingExecutor()); tableService.initialize(); LOG.info("AMS table service have been initialized"); diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/HiveCommitSyncExecutor.java b/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/HiveCommitSyncProcess.java similarity index 60% rename from amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/HiveCommitSyncExecutor.java rename to amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/HiveCommitSyncProcess.java index 6bc46a382a..c939bce5f4 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/HiveCommitSyncExecutor.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/HiveCommitSyncProcess.java @@ -16,69 +16,75 @@ * limitations under the License. */ -package org.apache.amoro.server.scheduler.inline; +package org.apache.amoro.server.process.iceberg; +import org.apache.amoro.Action; +import org.apache.amoro.AmoroTable; +import org.apache.amoro.IcebergActions; import org.apache.amoro.ServerTableIdentifier; import org.apache.amoro.TableRuntime; import org.apache.amoro.hive.table.SupportHive; import org.apache.amoro.hive.utils.HiveMetaSynchronizer; import org.apache.amoro.hive.utils.TableTypeUtil; -import org.apache.amoro.server.scheduler.PeriodicTableScheduler; -import org.apache.amoro.server.table.TableService; +import org.apache.amoro.process.ExecuteEngine; +import org.apache.amoro.process.LocalProcess; +import org.apache.amoro.process.TableProcess; +import org.apache.amoro.shade.guava32.com.google.common.collect.Maps; import org.apache.amoro.table.MixedTable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.concurrent.ThreadLocalRandom; +import java.util.Map; -public class HiveCommitSyncExecutor extends PeriodicTableScheduler { - private static final Logger LOG = LoggerFactory.getLogger(HiveCommitSyncExecutor.class); +/** Local table process for syncing Iceberg metadata to Hive. */ +public class HiveCommitSyncProcess extends TableProcess implements LocalProcess { - // 10 minutes - private static final long INTERVAL = 10 * 60 * 1000L; + private static final Logger LOG = LoggerFactory.getLogger(HiveCommitSyncProcess.class); - public HiveCommitSyncExecutor(TableService tableService, int poolSize) { - super(tableService, poolSize); - } - - @Override - protected long getNextExecutingTime(TableRuntime tableRuntime) { - return INTERVAL; + public static void syncIcebergToHive(MixedTable mixedTable) { + HiveMetaSynchronizer.syncMixedTableDataToHive((SupportHive) mixedTable); } - @Override - protected boolean enabled(TableRuntime tableRuntime) { - return true; + public HiveCommitSyncProcess(TableRuntime tableRuntime, ExecuteEngine engine) { + super(tableRuntime, engine); } @Override - protected long getExecutorDelay() { - return ThreadLocalRandom.current().nextLong(INTERVAL); + public String tag() { + return getAction().getName().toLowerCase(); } @Override - protected void execute(TableRuntime tableRuntime) { - long startTime = System.currentTimeMillis(); + public void run() { ServerTableIdentifier tableIdentifier = tableRuntime.getTableIdentifier(); try { - MixedTable mixedTable = (MixedTable) loadTable(tableRuntime).originalTable(); + AmoroTable amoroTable = tableRuntime.loadTable(); + MixedTable mixedTable = (MixedTable) amoroTable.originalTable(); if (!TableTypeUtil.isHive(mixedTable)) { LOG.debug("{} is not a support hive table", tableIdentifier); return; } + LOG.info("{} start hive sync", tableIdentifier); syncIcebergToHive(mixedTable); } catch (Exception e) { LOG.error("{} hive sync failed", tableIdentifier, e); - } finally { - LOG.info( - "{} hive sync finished, cost {}ms", - tableIdentifier, - System.currentTimeMillis() - startTime); + throw new RuntimeException(e); } } - public static void syncIcebergToHive(MixedTable mixedTable) { - HiveMetaSynchronizer.syncMixedTableDataToHive((SupportHive) mixedTable); + @Override + public Action getAction() { + return IcebergActions.SYNC_HIVE_TABLES; + } + + @Override + public Map getProcessParameters() { + return Maps.newHashMap(); + } + + @Override + public Map getSummary() { + return Maps.newHashMap(); } } diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/IcebergProcessFactory.java b/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/IcebergProcessFactory.java index ebd0a2905d..a08195613d 100755 --- a/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/IcebergProcessFactory.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/IcebergProcessFactory.java @@ -87,6 +87,14 @@ public class IcebergProcessFactory implements ProcessFactory { .durationType() .defaultValue(Duration.ofMinutes(1)); + public static final ConfigOption SYNC_HIVE_TABLES_ENABLED = + ConfigOptions.key("sync-hive-tables.enabled").booleanType().defaultValue(false); + + public static final ConfigOption SYNC_HIVE_TABLES_INTERVAL = + ConfigOptions.key("sync-hive-tables.interval") + .durationType() + .defaultValue(Duration.ofMinutes(10)); + private ExecuteEngine localEngine; private final Map actions = Maps.newHashMap(); private final List formats = @@ -129,6 +137,8 @@ public Optional trigger(TableRuntime tableRuntime, Action action) return triggerDataExpiring(tableRuntime); } else if (IcebergActions.AUTO_CREATE_TAGS.equals(action)) { return triggerAutoCreateTag(tableRuntime); + } else if (IcebergActions.SYNC_HIVE_TABLES.equals(action)) { + return triggerHiveCommitSync(tableRuntime); } return Optional.empty(); @@ -158,6 +168,8 @@ public TableProcess recover(TableRuntime tableRuntime, TableProcessStore store) return new DataExpiringProcess(tableRuntime, localEngine); } else if (IcebergActions.AUTO_CREATE_TAGS.equals(action)) { return new TagsAutoCreatingProcess(tableRuntime, localEngine); + } else if (IcebergActions.SYNC_HIVE_TABLES.equals(action)) { + return new HiveCommitSyncProcess(tableRuntime, localEngine); } throw new RecoverProcessFailedException( @@ -199,6 +211,12 @@ public void open(Map properties) { this.actions.put( IcebergActions.AUTO_CREATE_TAGS, ProcessTriggerStrategy.triggerAtFixRate(interval)); } + + if (configs.getBoolean(SYNC_HIVE_TABLES_ENABLED)) { + Duration interval = configs.getDuration(SYNC_HIVE_TABLES_INTERVAL); + this.actions.put( + IcebergActions.SYNC_HIVE_TABLES, ProcessTriggerStrategy.triggerAtFixRate(interval)); + } } private Optional triggerExpireSnapshot(TableRuntime tableRuntime) { @@ -275,6 +293,14 @@ private Optional triggerAutoCreateTag(TableRuntime tableRuntime) { return Optional.of(new TagsAutoCreatingProcess(tableRuntime, localEngine)); } + private Optional triggerHiveCommitSync(TableRuntime tableRuntime) { + if (localEngine == null) { + return Optional.empty(); + } + + return Optional.of(new HiveCommitSyncProcess(tableRuntime, localEngine)); + } + @Override public void close() {} diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/InlineTableExecutors.java b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/InlineTableExecutors.java index e13f369502..52c96ed1d2 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/InlineTableExecutors.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/InlineTableExecutors.java @@ -31,7 +31,6 @@ public class InlineTableExecutors { private BlockerExpiringExecutor blockerExpiringExecutor; private OptimizingCommitExecutor optimizingCommitExecutor; private ProcessDataExpiringExecutor processDataExpiringExecutor; - private HiveCommitSyncExecutor hiveCommitSyncExecutor; public static InlineTableExecutors getInstance() { return instance; @@ -59,11 +58,6 @@ public void setup(TableService tableService, Configurations conf) { new ProcessDataExpiringExecutor( tableService, optimizingKeepTime, expireInterval, processKeepTime); this.blockerExpiringExecutor = new BlockerExpiringExecutor(tableService); - if (conf.getBoolean(AmoroManagementConf.SYNC_HIVE_TABLES_ENABLED)) { - this.hiveCommitSyncExecutor = - new HiveCommitSyncExecutor( - tableService, conf.getInteger(AmoroManagementConf.SYNC_HIVE_TABLES_THREAD_COUNT)); - } this.tableRefreshingExecutor = new TableRuntimeRefreshExecutor( tableService, @@ -87,8 +81,4 @@ public OptimizingCommitExecutor getOptimizingCommitExecutor() { public ProcessDataExpiringExecutor getProcessDataExpiringExecutor() { return processDataExpiringExecutor; } - - public HiveCommitSyncExecutor getHiveCommitSyncExecutor() { - return hiveCommitSyncExecutor; - } } diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/AmsEnvironment.java b/amoro-ams/src/test/java/org/apache/amoro/server/AmsEnvironment.java index 916765d263..07898363a9 100644 --- a/amoro-ams/src/test/java/org/apache/amoro/server/AmsEnvironment.java +++ b/amoro-ams/src/test/java/org/apache/amoro/server/AmsEnvironment.java @@ -369,7 +369,6 @@ private String getAmsConfig() { + " refresh-table-thread-count: 10\n" + " refresh-table-interval: 60000 #1min\n" + " expire-table-thread-count: 10\n" - + " sync-hive-tables-thread-count: 10\n" + "\n" + " thrift-server:\n" + " max-message-size: 104857600 # 100MB\n" diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/TestAmoroManagementConfValidator.java b/amoro-ams/src/test/java/org/apache/amoro/server/TestAmoroManagementConfValidator.java index 40f88575db..8e17c8783c 100644 --- a/amoro-ams/src/test/java/org/apache/amoro/server/TestAmoroManagementConfValidator.java +++ b/amoro-ams/src/test/java/org/apache/amoro/server/TestAmoroManagementConfValidator.java @@ -126,13 +126,5 @@ public void testValidateThreadCount() { () -> AmoroManagementConfValidator.validateConfig(configurations)); configurations.setInteger(AmoroManagementConf.OPTIMIZING_COMMIT_THREAD_COUNT, 10); AmoroManagementConfValidator.validateConfig(configurations); - - configurations.setBoolean(AmoroManagementConf.SYNC_HIVE_TABLES_ENABLED, true); - configurations.setInteger(AmoroManagementConf.SYNC_HIVE_TABLES_THREAD_COUNT, -1); - Assert.assertThrows( - IllegalArgumentException.class, - () -> AmoroManagementConfValidator.validateConfig(configurations)); - configurations.setInteger(AmoroManagementConf.SYNC_HIVE_TABLES_THREAD_COUNT, 10); - AmoroManagementConfValidator.validateConfig(configurations); } } diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/process/iceberg/TestIcebergProcessFactory.java b/amoro-ams/src/test/java/org/apache/amoro/server/process/iceberg/TestIcebergProcessFactory.java index cd68d03822..247182fb29 100644 --- a/amoro-ams/src/test/java/org/apache/amoro/server/process/iceberg/TestIcebergProcessFactory.java +++ b/amoro-ams/src/test/java/org/apache/amoro/server/process/iceberg/TestIcebergProcessFactory.java @@ -54,6 +54,7 @@ public void testOpenAndSupportedActions() { assertSupportedAction( "clean-dangling-delete-files", IcebergActions.CLEAN_DANGLING_DELETE, Duration.ofHours(24)); assertSupportedAction("expire-data", IcebergActions.EXPIRE_DATA, Duration.ofHours(24)); + assertSupportedAction("sync-hive-tables", IcebergActions.SYNC_HIVE_TABLES, Duration.ofHours(1)); } @Test @@ -70,6 +71,8 @@ public void testTriggerActionWhenDue() { assertTriggerWhenDue("expire-data", IcebergActions.EXPIRE_DATA, DataExpiringProcess.class, 0); assertTriggerWhenDue( "auto-create-tags", IcebergActions.AUTO_CREATE_TAGS, TagsAutoCreatingProcess.class, 0); + assertTriggerWhenDue( + "sync-hive-tables", IcebergActions.SYNC_HIVE_TABLES, HiveCommitSyncProcess.class, 0); } @Test @@ -126,6 +129,11 @@ public void testRecoverAutoCreateTagProcess() { "auto-create-tags", IcebergActions.AUTO_CREATE_TAGS, TagsAutoCreatingProcess.class); } + @Test + public void testRecoverSyncHiveProcess() { + assertRecover("sync-hive-tables", IcebergActions.SYNC_HIVE_TABLES, HiveCommitSyncProcess.class); + } + @Test public void testRecoverUnsupportedActionThrows() { IcebergProcessFactory factory = openedFactory("expire-snapshots"); diff --git a/amoro-common/src/main/java/org/apache/amoro/IcebergActions.java b/amoro-common/src/main/java/org/apache/amoro/IcebergActions.java index ed10001194..b454bc9c10 100644 --- a/amoro-common/src/main/java/org/apache/amoro/IcebergActions.java +++ b/amoro-common/src/main/java/org/apache/amoro/IcebergActions.java @@ -26,7 +26,7 @@ public class IcebergActions { public static final Action SYSTEM = Action.register("system"); public static final Action REWRITE = Action.register("rewrite"); public static final Action CLEAN_ORPHAN = Action.register("clean-orphan-files"); - public static final Action SYNC_HIVE = Action.register("sync-hive"); + public static final Action SYNC_HIVE_TABLES = Action.register("sync-hive-tables"); public static final Action EXPIRE_DATA = Action.register("expire-data"); public static final Action EXPIRE_SNAPSHOTS = Action.register("expire-snapshots"); public static final Action CLEAN_DANGLING_DELETE = Action.register("clean-dangling-delete-files"); diff --git a/amoro-common/src/test/java/org/apache/amoro/process/TestLocalExecutionEngine.java b/amoro-common/src/test/java/org/apache/amoro/process/TestLocalExecutionEngine.java index 6a51db7db6..fb91cb63bd 100644 --- a/amoro-common/src/test/java/org/apache/amoro/process/TestLocalExecutionEngine.java +++ b/amoro-common/src/test/java/org/apache/amoro/process/TestLocalExecutionEngine.java @@ -50,6 +50,7 @@ public void testSubmitUsesCustomPoolByTag() throws Exception { assertCustomPoolByTag("clean-dangling-delete-files"); assertCustomPoolByTag("expire-data"); assertCustomPoolByTag("auto-create-tags"); + assertCustomPoolByTag("sync-hive-tables"); } private void assertCustomPoolByTag(String tag) throws Exception { @@ -160,6 +161,7 @@ private LocalExecutionEngine createEngineWithTtl(String ttl) { properties.put("pool.clean-dangling-delete-files.thread-count", "1"); properties.put("pool.expire-data.thread-count", "1"); properties.put("pool.auto-create-tags.thread-count", "1"); + properties.put("pool.sync-hive-tables.thread-count", "1"); properties.put("process.status.ttl", ttl); localEngine.open(properties); return localEngine; diff --git a/charts/amoro/templates/amoro-configmap.yaml b/charts/amoro/templates/amoro-configmap.yaml index fb5cfa925a..7a9f14ba59 100644 --- a/charts/amoro/templates/amoro-configmap.yaml +++ b/charts/amoro/templates/amoro-configmap.yaml @@ -99,10 +99,6 @@ data: timeout: 1min # 60000 # optional features - sync-hive-tables: - enabled: false - thread-count: 10 - table-manifest-io: thread-count: 20 diff --git a/dist/src/main/amoro-bin/conf/config.yaml b/dist/src/main/amoro-bin/conf/config.yaml index 707673169c..f4cdaaac8f 100644 --- a/dist/src/main/amoro-bin/conf/config.yaml +++ b/dist/src/main/amoro-bin/conf/config.yaml @@ -108,10 +108,6 @@ ams: ~ # optional features - sync-hive-tables: - enabled: false - thread-count: 10 - table-manifest-io: thread-count: 20 diff --git a/dist/src/main/amoro-bin/conf/plugins/execute-engines.yaml b/dist/src/main/amoro-bin/conf/plugins/execute-engines.yaml index c68fdab4d7..14ebcc4e90 100755 --- a/dist/src/main/amoro-bin/conf/plugins/execute-engines.yaml +++ b/dist/src/main/amoro-bin/conf/plugins/execute-engines.yaml @@ -25,4 +25,5 @@ execute-engines: pool.clean-orphan-files.thread-count: 10 pool.clean-dangling-delete-files.thread-count: 10 pool.expire-data.thread-count: 10 - pool.auto-create-tags.thread-count: 3 \ No newline at end of file + pool.auto-create-tags.thread-count: 3 + pool.sync-hive-tables.thread-count: 10 \ No newline at end of file diff --git a/dist/src/main/amoro-bin/conf/plugins/process-factories.yaml b/dist/src/main/amoro-bin/conf/plugins/process-factories.yaml index cf5d8ad085..1085d364fe 100755 --- a/dist/src/main/amoro-bin/conf/plugins/process-factories.yaml +++ b/dist/src/main/amoro-bin/conf/plugins/process-factories.yaml @@ -30,3 +30,5 @@ process-factories: expire-data.interval: "1d" auto-create-tags.enabled: "true" auto-create-tags.interval: "1min" + sync-hive-tables.enabled: "false" + sync-hive-tables.interval: "10min" diff --git a/docs/admin-guides/deployment.md b/docs/admin-guides/deployment.md index 52c67f19fe..14f32149fd 100644 --- a/docs/admin-guides/deployment.md +++ b/docs/admin-guides/deployment.md @@ -280,6 +280,8 @@ process-factories: expire-data.interval: "1d" # interval for data expiration auto-create-tags.enabled: "true" # enable auto creating tags auto-create-tags.interval: "1m" # interval for auto creating tags + sync-hive-tables.enabled: "false" # enable synchronizing Hive tables + sync-hive-tables.interval: "10min" # interval for synchronizing Hive tables ``` {{< hint info >}} @@ -308,11 +310,12 @@ execute-engines: priority: 100 properties: pool.default.thread-count: 10 # default thread pool size - pool.expire-snapshots.thread-count: 10 # thread pool for snapshot expiration - pool.clean-orphan-files.thread-count: 10 # thread pool for orphan file cleaning + pool.expire-snapshots.thread-count: 10 # thread pool for snapshot expiration + pool.clean-orphan-files.thread-count: 10 # thread pool for orphan file cleaning pool.clean-dangling-delete-files.thread-count: 10 # thread pool for dangling delete files cleaning pool.expire-data.thread-count: 10 # thread pool for data expiration - pool.auto-create-tags.thread-count: 3 # thread pool for auto creating tags + pool.auto-create-tags.thread-count: 3 # thread pool for auto creating tags + pool.sync-hive-tables.thread-count: 10 # thread pool for synchronizing Hive tables process.status.ttl: 4h # TTL for process status cache ``` diff --git a/docs/configuration/ams-config.md b/docs/configuration/ams-config.md index d886aa8e5a..2a6c6b6e8e 100644 --- a/docs/configuration/ams-config.md +++ b/docs/configuration/ams-config.md @@ -116,8 +116,6 @@ table td:last-child, table th:last-child { width: 40%; word-break: break-all; } | self-optimizing.runtime-data-keep-time | 30 d | Duration that self-optimizing runtime data is retained. | | server-bind-host | 0.0.0.0 | The host bound to the server. | | server-expose-host | | The exposed host of the server. | -| sync-hive-tables.enabled | false | Enable synchronizing Hive tables. | -| sync-hive-tables.thread-count | 10 | The number of threads used for synchronizing Hive tables. | | table-manifest-io.thread-count | 20 | Sets the size of the worker pool. The worker pool limits the number of tasks concurrently processing manifests in the base table implementation across all concurrent planning or commit operations. | | terminal.backend | local | Terminal backend implementation. local, kyuubi and custom are valid values. | | terminal.factory | <undefined> | Session factory implement of terminal, `terminal.backend` must be `custom` if this is set. |