From e14a6a166ffc1f45f1abf78cbe59f9ecf53316ef Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E6=96=87=E9=A2=86?= Date: Wed, 20 May 2026 15:15:03 +0800 Subject: [PATCH] Refactor auto-creating iceberg tags via ProcessFactory plugin --- .../amoro/server/AmoroManagementConf.java | 18 ----- .../amoro/server/AmoroServiceContainer.java | 1 - .../iceberg/IcebergProcessFactory.java | 31 +++++++- .../iceberg/TagsAutoCreatingProcess.java | 76 +++++++++++++++++++ .../inline/InlineTableExecutors.java | 12 --- .../inline/TagsAutoCreatingExecutor.java | 76 ------------------- .../amoro/server/TestAmoroManagementConf.java | 5 -- .../iceberg/TestIcebergProcessFactory.java | 18 ++++- .../src/test/resources/config-with-units.yaml | 22 ------ .../test/resources/config-without-units.yaml | 3 - .../java/org/apache/amoro/IcebergActions.java | 1 + .../process/TestLocalExecutionEngine.java | 2 + charts/amoro/templates/amoro-configmap.yaml | 5 -- dist/src/main/amoro-bin/conf/config.yaml | 5 -- .../conf/plugins/execute-engines.yaml | 3 +- .../conf/plugins/process-factories.yaml | 2 + docs/admin-guides/deployment.md | 3 + docs/configuration/ams-config.md | 3 - 18 files changed, 132 insertions(+), 154 deletions(-) create mode 100644 amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/TagsAutoCreatingProcess.java delete mode 100644 amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/TagsAutoCreatingExecutor.java diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java index 292a9975fa..d44bb4c3b1 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java @@ -174,24 +174,6 @@ public class AmoroManagementConf { .defaultValue(10) .withDescription("The number of threads used for refreshing tables."); - public static final ConfigOption AUTO_CREATE_TAGS_ENABLED = - ConfigOptions.key("auto-create-tags.enabled") - .booleanType() - .defaultValue(true) - .withDescription("Enable creating tags."); - - public static final ConfigOption AUTO_CREATE_TAGS_THREAD_COUNT = - ConfigOptions.key("auto-create-tags.thread-count") - .intType() - .defaultValue(3) - .withDescription("The number of threads used for creating tags."); - - public static final ConfigOption AUTO_CREATE_TAGS_INTERVAL = - ConfigOptions.key("auto-create-tags.interval") - .durationType() - .defaultValue(Duration.ofMinutes(1)) - .withDescription("Interval for creating tags."); - public static final ConfigOption REFRESH_TABLES_INTERVAL = ConfigOptions.key("refresh-tables.interval") .durationType() diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java index 7956fd0bc1..e579777add 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java @@ -294,7 +294,6 @@ public void startOptimizingService() throws Exception { addHandlerChain(InlineTableExecutors.getInstance().getBlockerExpiringExecutor()); addHandlerChain(InlineTableExecutors.getInstance().getHiveCommitSyncExecutor()); addHandlerChain(InlineTableExecutors.getInstance().getTableRefreshingExecutor()); - addHandlerChain(InlineTableExecutors.getInstance().getTagsAutoCreatingExecutor()); tableService.initialize(); LOG.info("AMS table service have been initialized"); tableManager.setTableService(tableService); diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/IcebergProcessFactory.java b/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/IcebergProcessFactory.java index c90fcde833..ebd0a2905d 100755 --- a/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/IcebergProcessFactory.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/IcebergProcessFactory.java @@ -79,6 +79,14 @@ public class IcebergProcessFactory implements ProcessFactory { public static final ConfigOption DATA_EXPIRE_INTERVAL = ConfigOptions.key("expire-data.interval").durationType().defaultValue(Duration.ofDays(1)); + public static final ConfigOption AUTO_CREATE_TAGS_ENABLED = + ConfigOptions.key("auto-create-tags.enabled").booleanType().defaultValue(true); + + public static final ConfigOption AUTO_CREATE_TAGS_INTERVAL = + ConfigOptions.key("auto-create-tags.interval") + .durationType() + .defaultValue(Duration.ofMinutes(1)); + private ExecuteEngine localEngine; private final Map actions = Maps.newHashMap(); private final List formats = @@ -119,6 +127,8 @@ public Optional trigger(TableRuntime tableRuntime, Action action) return triggerCleanDanglingDelete(tableRuntime); } else if (IcebergActions.EXPIRE_DATA.equals(action)) { return triggerDataExpiring(tableRuntime); + } else if (IcebergActions.AUTO_CREATE_TAGS.equals(action)) { + return triggerAutoCreateTag(tableRuntime); } return Optional.empty(); @@ -135,8 +145,7 @@ public TableProcess recover(TableRuntime tableRuntime, TableProcessStore store) + action); } - // SnapshotsExpiringProcess, OrphanFilesCleaningProcess, DanglingDeleteFilesCleaningProcess - // and DataExpiringProcess are stateless, idempotent one-shot local maintenance tasks + // The following processes are stateless, idempotent one-shot local maintenance tasks // (no checkpoint), so recovery simply rebuilds the process so it can run again. // The store/processId/tracking is owned by ProcessService. if (IcebergActions.EXPIRE_SNAPSHOTS.equals(action)) { @@ -147,6 +156,8 @@ public TableProcess recover(TableRuntime tableRuntime, TableProcessStore store) return new DanglingDeleteFilesCleaningProcess(tableRuntime, localEngine); } else if (IcebergActions.EXPIRE_DATA.equals(action)) { return new DataExpiringProcess(tableRuntime, localEngine); + } else if (IcebergActions.AUTO_CREATE_TAGS.equals(action)) { + return new TagsAutoCreatingProcess(tableRuntime, localEngine); } throw new RecoverProcessFailedException( @@ -182,6 +193,12 @@ public void open(Map properties) { this.actions.put( IcebergActions.EXPIRE_DATA, ProcessTriggerStrategy.triggerAtFixRate(interval)); } + + if (configs.getBoolean(AUTO_CREATE_TAGS_ENABLED)) { + Duration interval = configs.getDuration(AUTO_CREATE_TAGS_INTERVAL); + this.actions.put( + IcebergActions.AUTO_CREATE_TAGS, ProcessTriggerStrategy.triggerAtFixRate(interval)); + } } private Optional triggerExpireSnapshot(TableRuntime tableRuntime) { @@ -248,6 +265,16 @@ private Optional triggerDataExpiring(TableRuntime tableRuntime) { return Optional.of(new DataExpiringProcess(tableRuntime, localEngine)); } + private Optional triggerAutoCreateTag(TableRuntime tableRuntime) { + if (localEngine == null + || tableRuntime.getFormat() != TableFormat.ICEBERG + || !tableRuntime.getTableConfiguration().getTagConfiguration().isAutoCreateTag()) { + return Optional.empty(); + } + + return Optional.of(new TagsAutoCreatingProcess(tableRuntime, localEngine)); + } + @Override public void close() {} diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/TagsAutoCreatingProcess.java b/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/TagsAutoCreatingProcess.java new file mode 100644 index 0000000000..1a841460e8 --- /dev/null +++ b/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/TagsAutoCreatingProcess.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.server.process.iceberg; + +import org.apache.amoro.Action; +import org.apache.amoro.AmoroTable; +import org.apache.amoro.IcebergActions; +import org.apache.amoro.TableRuntime; +import org.apache.amoro.maintainer.TableMaintainer; +import org.apache.amoro.process.ExecuteEngine; +import org.apache.amoro.process.LocalProcess; +import org.apache.amoro.process.TableProcess; +import org.apache.amoro.server.optimizing.maintainer.TableMaintainerFactory; +import org.apache.amoro.shade.guava32.com.google.common.collect.Maps; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; + +/** Local table process for auto-creating Iceberg tags. */ +public class TagsAutoCreatingProcess extends TableProcess implements LocalProcess { + + private static final Logger LOG = LoggerFactory.getLogger(TagsAutoCreatingProcess.class); + + public TagsAutoCreatingProcess(TableRuntime tableRuntime, ExecuteEngine engine) { + super(tableRuntime, engine); + } + + @Override + public String tag() { + return getAction().getName().toLowerCase(); + } + + @Override + public void run() { + try { + AmoroTable amoroTable = tableRuntime.loadTable(); + TableMaintainer tableMaintainer = TableMaintainerFactory.create(amoroTable, tableRuntime); + tableMaintainer.autoCreateTags(); + } catch (Throwable t) { + LOG.error("Failed to create tags on {}", tableRuntime.getTableIdentifier(), t); + throw new RuntimeException(t); + } + } + + @Override + public Action getAction() { + return IcebergActions.AUTO_CREATE_TAGS; + } + + @Override + public Map getProcessParameters() { + return Maps.newHashMap(); + } + + @Override + public Map getSummary() { + return Maps.newHashMap(); + } +} diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/InlineTableExecutors.java b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/InlineTableExecutors.java index ec402a90d3..e13f369502 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/InlineTableExecutors.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/InlineTableExecutors.java @@ -32,7 +32,6 @@ public class InlineTableExecutors { private OptimizingCommitExecutor optimizingCommitExecutor; private ProcessDataExpiringExecutor processDataExpiringExecutor; private HiveCommitSyncExecutor hiveCommitSyncExecutor; - private TagsAutoCreatingExecutor tagsAutoCreatingExecutor; public static InlineTableExecutors getInstance() { return instance; @@ -71,13 +70,6 @@ public void setup(TableService tableService, Configurations conf) { conf.getInteger(AmoroManagementConf.REFRESH_TABLES_THREAD_COUNT), conf.get(AmoroManagementConf.REFRESH_TABLES_INTERVAL).toMillis(), conf.getInteger(AmoroManagementConf.REFRESH_MAX_PENDING_PARTITIONS)); - if (conf.getBoolean(AmoroManagementConf.AUTO_CREATE_TAGS_ENABLED)) { - this.tagsAutoCreatingExecutor = - new TagsAutoCreatingExecutor( - tableService, - conf.getInteger(AmoroManagementConf.AUTO_CREATE_TAGS_THREAD_COUNT), - conf.get(AmoroManagementConf.AUTO_CREATE_TAGS_INTERVAL).toMillis()); - } } public TableRuntimeRefreshExecutor getTableRefreshingExecutor() { @@ -99,8 +91,4 @@ public ProcessDataExpiringExecutor getProcessDataExpiringExecutor() { public HiveCommitSyncExecutor getHiveCommitSyncExecutor() { return hiveCommitSyncExecutor; } - - public TagsAutoCreatingExecutor getTagsAutoCreatingExecutor() { - return tagsAutoCreatingExecutor; - } } diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/TagsAutoCreatingExecutor.java b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/TagsAutoCreatingExecutor.java deleted file mode 100644 index b70015e8e8..0000000000 --- a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/TagsAutoCreatingExecutor.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.amoro.server.scheduler.inline; - -import org.apache.amoro.AmoroTable; -import org.apache.amoro.TableFormat; -import org.apache.amoro.TableRuntime; -import org.apache.amoro.config.TableConfiguration; -import org.apache.amoro.maintainer.TableMaintainer; -import org.apache.amoro.server.optimizing.maintainer.TableMaintainers; -import org.apache.amoro.server.scheduler.PeriodicTableScheduler; -import org.apache.amoro.server.table.TableService; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.concurrent.ThreadLocalRandom; - -/** Service for automatically creating tags for table periodically. */ -public class TagsAutoCreatingExecutor extends PeriodicTableScheduler { - private static final Logger LOG = LoggerFactory.getLogger(TagsAutoCreatingExecutor.class); - - private final long interval; - - protected TagsAutoCreatingExecutor(TableService tableService, int poolSize, long interval) { - super(tableService, poolSize); - this.interval = interval; - } - - @Override - protected long getNextExecutingTime(TableRuntime tableRuntime) { - return interval; - } - - @Override - protected boolean enabled(TableRuntime tableRuntime) { - return tableRuntime.getTableConfiguration().getTagConfiguration().isAutoCreateTag() - && tableRuntime.getFormat() == TableFormat.ICEBERG; - } - - @Override - protected long getExecutorDelay() { - return ThreadLocalRandom.current().nextLong(interval); - } - - @Override - protected void execute(TableRuntime tableRuntime) { - try { - AmoroTable amoroTable = loadTable(tableRuntime); - TableMaintainer tableMaintainer = TableMaintainers.create(amoroTable, tableRuntime); - tableMaintainer.autoCreateTags(); - } catch (Throwable t) { - LOG.error("Failed to create tags on {}", tableRuntime.getTableIdentifier(), t); - } - } - - @Override - public void handleConfigChanged(TableRuntime tableRuntime, TableConfiguration originalConfig) { - scheduleIfNecessary(tableRuntime, getStartDelay()); - } -} diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/TestAmoroManagementConf.java b/amoro-ams/src/test/java/org/apache/amoro/server/TestAmoroManagementConf.java index 99e8b451a7..414f760c2d 100644 --- a/amoro-ams/src/test/java/org/apache/amoro/server/TestAmoroManagementConf.java +++ b/amoro-ams/src/test/java/org/apache/amoro/server/TestAmoroManagementConf.java @@ -45,7 +45,6 @@ public class TestAmoroManagementConf { private static final ConfigOption[] TIME_RELATED_CONFIG_OPTIONS = new ConfigOption[] { AmoroManagementConf.REFRESH_EXTERNAL_CATALOGS_INTERVAL, - AmoroManagementConf.AUTO_CREATE_TAGS_INTERVAL, AmoroManagementConf.REFRESH_TABLES_INTERVAL, AmoroManagementConf.BLOCKER_TIMEOUT, AmoroManagementConf.OPTIMIZER_HB_TIMEOUT, @@ -57,7 +56,6 @@ public class TestAmoroManagementConf { private static final Map DEFAULT_TIME_UNIT_IN_OLD_VERSIONS = ImmutableMap.builder() .put(AmoroManagementConf.REFRESH_EXTERNAL_CATALOGS_INTERVAL.key(), "ms") - .put(AmoroManagementConf.AUTO_CREATE_TAGS_INTERVAL.key(), "ms") .put(AmoroManagementConf.REFRESH_TABLES_INTERVAL.key(), "ms") .put(AmoroManagementConf.BLOCKER_TIMEOUT.key(), "ms") .put(AmoroManagementConf.OPTIMIZER_HB_TIMEOUT.key(), "ms") @@ -228,7 +226,6 @@ private void assertStorageRelatedConfigs( .put("optimizer.task-ack-timeout", "30000") .put("optimizer.polling-timeout", "3000") .put("blocker.timeout", "60000") - .put("auto-create-tags.interval", "60000") .put("terminal.session.timeout", "1800000") .build(); @@ -240,7 +237,6 @@ private void assertStorageRelatedConfigs( .put("optimizer.task-ack-timeout", "60 s") .put("optimizer.polling-timeout", "6 s") .put("blocker.timeout", "2 min") - .put("auto-create-tags.interval", "2 min") .put("terminal.session.timeout", "30 ms") .build(); @@ -252,7 +248,6 @@ private void assertStorageRelatedConfigs( .put("optimizer.task-ack-timeout", "60 s") .put("optimizer.polling-timeout", "6 s") .put("blocker.timeout", "2 min") - .put("auto-create-tags.interval", "2 min") .put("terminal.session.timeout", "30 min") .build(); diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/process/iceberg/TestIcebergProcessFactory.java b/amoro-ams/src/test/java/org/apache/amoro/server/process/iceberg/TestIcebergProcessFactory.java index 1f1f57ca7a..cd68d03822 100644 --- a/amoro-ams/src/test/java/org/apache/amoro/server/process/iceberg/TestIcebergProcessFactory.java +++ b/amoro-ams/src/test/java/org/apache/amoro/server/process/iceberg/TestIcebergProcessFactory.java @@ -27,6 +27,7 @@ import org.apache.amoro.TableRuntime; import org.apache.amoro.config.DataExpirationConfig; import org.apache.amoro.config.TableConfiguration; +import org.apache.amoro.config.TagConfiguration; import org.apache.amoro.process.LocalExecutionEngine; import org.apache.amoro.process.ProcessTriggerStrategy; import org.apache.amoro.process.RecoverProcessFailedException; @@ -67,6 +68,8 @@ public void testTriggerActionWhenDue() { DanglingDeleteFilesCleaningProcess.class, 0); assertTriggerWhenDue("expire-data", IcebergActions.EXPIRE_DATA, DataExpiringProcess.class, 0); + assertTriggerWhenDue( + "auto-create-tags", IcebergActions.AUTO_CREATE_TAGS, TagsAutoCreatingProcess.class, 0); } @Test @@ -89,6 +92,7 @@ public void testTriggerActionDisabled() { assertTriggerDisabled( "clean-dangling-delete-files", IcebergActions.CLEAN_DANGLING_DELETE, false, 0); assertTriggerDisabled("expire-data", IcebergActions.EXPIRE_DATA, false, 0); + assertTriggerDisabled("auto-create-tags", IcebergActions.AUTO_CREATE_TAGS, false, 0); } @Test @@ -116,6 +120,12 @@ public void testRecoverDataExpiringProcess() { assertRecover("expire-data", IcebergActions.EXPIRE_DATA, DataExpiringProcess.class); } + @Test + public void testRecoverAutoCreateTagProcess() { + assertRecover( + "auto-create-tags", IcebergActions.AUTO_CREATE_TAGS, TagsAutoCreatingProcess.class); + } + @Test public void testRecoverUnsupportedActionThrows() { IcebergProcessFactory factory = openedFactory("expire-snapshots"); @@ -258,6 +268,10 @@ private TableRuntime createRuntime(String configKey, boolean enabled, long lastT tableConfiguration.setDeleteDanglingDeleteFilesEnabled(enabled); } else if ("expire-data".equals(configKey)) { tableConfiguration.setExpiringDataConfig(new DataExpirationConfig().setEnabled(enabled)); + } else if ("auto-create-tags".equals(configKey)) { + TagConfiguration tagConfiguration = new TagConfiguration(); + tagConfiguration.setAutoCreateTag(enabled); + tableConfiguration.setTagConfiguration(tagConfiguration); } TableRuntimeCleanupState cleanupState = new TableRuntimeCleanupState(); @@ -274,7 +288,9 @@ private TableRuntime createRuntime(String configKey, boolean enabled, long lastT TableRuntime runtime = mock(TableRuntime.class); doReturn(tableConfiguration).when(runtime).getTableConfiguration(); doReturn(cleanupState).when(runtime).getState(DefaultTableRuntime.CLEANUP_STATE_KEY); - + if ("auto-create-tags".equals(configKey)) { + doReturn(TableFormat.ICEBERG).when(runtime).getFormat(); + } return runtime; } } diff --git a/amoro-ams/src/test/resources/config-with-units.yaml b/amoro-ams/src/test/resources/config-with-units.yaml index 7f307de7d7..b3a571e9b8 100644 --- a/amoro-ams/src/test/resources/config-with-units.yaml +++ b/amoro-ams/src/test/resources/config-with-units.yaml @@ -68,33 +68,11 @@ ams: timeout: 1min # optional features - expire-snapshots: - enabled: true - thread-count: 10 - - clean-orphan-files: - enabled: true - thread-count: 10 - interval: 1d - - clean-dangling-delete-files: - enabled: true - thread-count: 10 sync-hive-tables: enabled: false thread-count: 10 - data-expiration: - enabled: true - thread-count: 10 - interval: 1d - - auto-create-tags: - enabled: true - thread-count: 3 - interval: 1min - table-manifest-io: thread-count: 20 diff --git a/amoro-ams/src/test/resources/config-without-units.yaml b/amoro-ams/src/test/resources/config-without-units.yaml index a78164f2ca..9c587c37b0 100644 --- a/amoro-ams/src/test/resources/config-without-units.yaml +++ b/amoro-ams/src/test/resources/config-without-units.yaml @@ -33,9 +33,6 @@ ams: blocker: timeout: 120000 # 2min - auto-create-tags: - interval: 120000 # 2min - terminal: session: timeout: 30 # 30min when version < 0.8 \ No newline at end of file diff --git a/amoro-common/src/main/java/org/apache/amoro/IcebergActions.java b/amoro-common/src/main/java/org/apache/amoro/IcebergActions.java index 3314187d0c..ed10001194 100644 --- a/amoro-common/src/main/java/org/apache/amoro/IcebergActions.java +++ b/amoro-common/src/main/java/org/apache/amoro/IcebergActions.java @@ -30,4 +30,5 @@ public class IcebergActions { public static final Action EXPIRE_DATA = Action.register("expire-data"); public static final Action EXPIRE_SNAPSHOTS = Action.register("expire-snapshots"); public static final Action CLEAN_DANGLING_DELETE = Action.register("clean-dangling-delete-files"); + public static final Action AUTO_CREATE_TAGS = Action.register("auto-create-tags"); } diff --git a/amoro-common/src/test/java/org/apache/amoro/process/TestLocalExecutionEngine.java b/amoro-common/src/test/java/org/apache/amoro/process/TestLocalExecutionEngine.java index 7ffa2a98bc..6a51db7db6 100644 --- a/amoro-common/src/test/java/org/apache/amoro/process/TestLocalExecutionEngine.java +++ b/amoro-common/src/test/java/org/apache/amoro/process/TestLocalExecutionEngine.java @@ -49,6 +49,7 @@ public void testSubmitUsesCustomPoolByTag() throws Exception { assertCustomPoolByTag("clean-orphan-files"); assertCustomPoolByTag("clean-dangling-delete-files"); assertCustomPoolByTag("expire-data"); + assertCustomPoolByTag("auto-create-tags"); } private void assertCustomPoolByTag(String tag) throws Exception { @@ -158,6 +159,7 @@ private LocalExecutionEngine createEngineWithTtl(String ttl) { properties.put("pool.clean-orphan-files.thread-count", "1"); properties.put("pool.clean-dangling-delete-files.thread-count", "1"); properties.put("pool.expire-data.thread-count", "1"); + properties.put("pool.auto-create-tags.thread-count", "1"); properties.put("process.status.ttl", ttl); localEngine.open(properties); return localEngine; diff --git a/charts/amoro/templates/amoro-configmap.yaml b/charts/amoro/templates/amoro-configmap.yaml index 7fcde4974f..fb5cfa925a 100644 --- a/charts/amoro/templates/amoro-configmap.yaml +++ b/charts/amoro/templates/amoro-configmap.yaml @@ -103,11 +103,6 @@ data: enabled: false thread-count: 10 - auto-create-tags: - enabled: true - thread-count: 3 - interval: 1min # 60000 - table-manifest-io: thread-count: 20 diff --git a/dist/src/main/amoro-bin/conf/config.yaml b/dist/src/main/amoro-bin/conf/config.yaml index 4e6b66e8c5..707673169c 100644 --- a/dist/src/main/amoro-bin/conf/config.yaml +++ b/dist/src/main/amoro-bin/conf/config.yaml @@ -112,11 +112,6 @@ ams: enabled: false thread-count: 10 - auto-create-tags: - enabled: true - thread-count: 3 - interval: 1min # 60000 - table-manifest-io: thread-count: 20 diff --git a/dist/src/main/amoro-bin/conf/plugins/execute-engines.yaml b/dist/src/main/amoro-bin/conf/plugins/execute-engines.yaml index 9cd9aa6b91..c68fdab4d7 100755 --- a/dist/src/main/amoro-bin/conf/plugins/execute-engines.yaml +++ b/dist/src/main/amoro-bin/conf/plugins/execute-engines.yaml @@ -24,4 +24,5 @@ execute-engines: pool.expire-snapshots.thread-count: 10 pool.clean-orphan-files.thread-count: 10 pool.clean-dangling-delete-files.thread-count: 10 - pool.expire-data.thread-count: 10 \ No newline at end of file + pool.expire-data.thread-count: 10 + pool.auto-create-tags.thread-count: 3 \ No newline at end of file diff --git a/dist/src/main/amoro-bin/conf/plugins/process-factories.yaml b/dist/src/main/amoro-bin/conf/plugins/process-factories.yaml index 2adb757c36..cf5d8ad085 100755 --- a/dist/src/main/amoro-bin/conf/plugins/process-factories.yaml +++ b/dist/src/main/amoro-bin/conf/plugins/process-factories.yaml @@ -28,3 +28,5 @@ process-factories: clean-dangling-delete-files.interval: "1d" expire-data.enabled: "true" expire-data.interval: "1d" + auto-create-tags.enabled: "true" + auto-create-tags.interval: "1min" diff --git a/docs/admin-guides/deployment.md b/docs/admin-guides/deployment.md index 3ce65c2e41..52c67f19fe 100644 --- a/docs/admin-guides/deployment.md +++ b/docs/admin-guides/deployment.md @@ -278,6 +278,8 @@ process-factories: clean-dangling-delete-files.interval: "1d" # interval for cleaning dangling delete files expire-data.enabled: "true" # enable data expiration expire-data.interval: "1d" # interval for data expiration + auto-create-tags.enabled: "true" # enable auto creating tags + auto-create-tags.interval: "1m" # interval for auto creating tags ``` {{< hint info >}} @@ -310,6 +312,7 @@ execute-engines: pool.clean-orphan-files.thread-count: 10 # thread pool for orphan file cleaning pool.clean-dangling-delete-files.thread-count: 10 # thread pool for dangling delete files cleaning pool.expire-data.thread-count: 10 # thread pool for data expiration + pool.auto-create-tags.thread-count: 3 # thread pool for auto creating tags process.status.ttl: 4h # TTL for process status cache ``` diff --git a/docs/configuration/ams-config.md b/docs/configuration/ams-config.md index d03d149cfc..d886aa8e5a 100644 --- a/docs/configuration/ams-config.md +++ b/docs/configuration/ams-config.md @@ -44,9 +44,6 @@ table td:last-child, table th:last-child { width: 40%; word-break: break-all; } | --- | ------- | ----------- | | admin-password | admin | The administrator password | | admin-username | admin | The administrator account name. | -| auto-create-tags.enabled | true | Enable creating tags. | -| auto-create-tags.interval | 1 min | Interval for creating tags. | -| auto-create-tags.thread-count | 3 | The number of threads used for creating tags. | | blocker.timeout | 1 min | Session timeout. Default unit is milliseconds if not specified. | | catalog-meta-cache.expiration-interval | 1 min | TTL for catalog metadata. | | database.auto-create-tables | true | Auto init table schema when started |