Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -174,24 +174,6 @@ public class AmoroManagementConf {
.defaultValue(10)
.withDescription("The number of threads used for refreshing tables.");

public static final ConfigOption<Boolean> AUTO_CREATE_TAGS_ENABLED =
ConfigOptions.key("auto-create-tags.enabled")
.booleanType()
.defaultValue(true)
.withDescription("Enable creating tags.");

public static final ConfigOption<Integer> AUTO_CREATE_TAGS_THREAD_COUNT =
ConfigOptions.key("auto-create-tags.thread-count")
.intType()
.defaultValue(3)
.withDescription("The number of threads used for creating tags.");

public static final ConfigOption<Duration> AUTO_CREATE_TAGS_INTERVAL =
ConfigOptions.key("auto-create-tags.interval")
.durationType()
.defaultValue(Duration.ofMinutes(1))
.withDescription("Interval for creating tags.");

public static final ConfigOption<Duration> REFRESH_TABLES_INTERVAL =
ConfigOptions.key("refresh-tables.interval")
.durationType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,6 @@ public void startOptimizingService() throws Exception {
addHandlerChain(InlineTableExecutors.getInstance().getBlockerExpiringExecutor());
addHandlerChain(InlineTableExecutors.getInstance().getHiveCommitSyncExecutor());
addHandlerChain(InlineTableExecutors.getInstance().getTableRefreshingExecutor());
addHandlerChain(InlineTableExecutors.getInstance().getTagsAutoCreatingExecutor());
tableService.initialize();
LOG.info("AMS table service have been initialized");
tableManager.setTableService(tableService);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,14 @@ public class IcebergProcessFactory implements ProcessFactory {
public static final ConfigOption<Duration> DATA_EXPIRE_INTERVAL =
ConfigOptions.key("expire-data.interval").durationType().defaultValue(Duration.ofDays(1));

public static final ConfigOption<Boolean> AUTO_CREATE_TAGS_ENABLED =
ConfigOptions.key("auto-create-tags.enabled").booleanType().defaultValue(true);

public static final ConfigOption<Duration> AUTO_CREATE_TAGS_INTERVAL =
ConfigOptions.key("auto-create-tags.interval")
.durationType()
.defaultValue(Duration.ofMinutes(1));

private ExecuteEngine localEngine;
private final Map<Action, ProcessTriggerStrategy> actions = Maps.newHashMap();
private final List<TableFormat> formats =
Expand Down Expand Up @@ -119,6 +127,8 @@ public Optional<TableProcess> trigger(TableRuntime tableRuntime, Action action)
return triggerCleanDanglingDelete(tableRuntime);
} else if (IcebergActions.EXPIRE_DATA.equals(action)) {
return triggerDataExpiring(tableRuntime);
} else if (IcebergActions.AUTO_CREATE_TAGS.equals(action)) {
return triggerAutoCreateTag(tableRuntime);
}

return Optional.empty();
Expand All @@ -135,8 +145,7 @@ public TableProcess recover(TableRuntime tableRuntime, TableProcessStore store)
+ action);
}

// SnapshotsExpiringProcess, OrphanFilesCleaningProcess, DanglingDeleteFilesCleaningProcess
// and DataExpiringProcess are stateless, idempotent one-shot local maintenance tasks
// The following processes are stateless, idempotent one-shot local maintenance tasks
// (no checkpoint), so recovery simply rebuilds the process so it can run again.
// The store/processId/tracking is owned by ProcessService.
if (IcebergActions.EXPIRE_SNAPSHOTS.equals(action)) {
Expand All @@ -147,6 +156,8 @@ public TableProcess recover(TableRuntime tableRuntime, TableProcessStore store)
return new DanglingDeleteFilesCleaningProcess(tableRuntime, localEngine);
} else if (IcebergActions.EXPIRE_DATA.equals(action)) {
return new DataExpiringProcess(tableRuntime, localEngine);
} else if (IcebergActions.AUTO_CREATE_TAGS.equals(action)) {
return new TagsAutoCreatingProcess(tableRuntime, localEngine);
}

throw new RecoverProcessFailedException(
Expand Down Expand Up @@ -182,6 +193,12 @@ public void open(Map<String, String> properties) {
this.actions.put(
IcebergActions.EXPIRE_DATA, ProcessTriggerStrategy.triggerAtFixRate(interval));
}

if (configs.getBoolean(AUTO_CREATE_TAGS_ENABLED)) {
Duration interval = configs.getDuration(AUTO_CREATE_TAGS_INTERVAL);
this.actions.put(
IcebergActions.AUTO_CREATE_TAGS, ProcessTriggerStrategy.triggerAtFixRate(interval));
}
}

private Optional<TableProcess> triggerExpireSnapshot(TableRuntime tableRuntime) {
Expand Down Expand Up @@ -248,6 +265,16 @@ private Optional<TableProcess> triggerDataExpiring(TableRuntime tableRuntime) {
return Optional.of(new DataExpiringProcess(tableRuntime, localEngine));
}

private Optional<TableProcess> triggerAutoCreateTag(TableRuntime tableRuntime) {
if (localEngine == null
|| tableRuntime.getFormat() != TableFormat.ICEBERG
|| !tableRuntime.getTableConfiguration().getTagConfiguration().isAutoCreateTag()) {
return Optional.empty();
}

return Optional.of(new TagsAutoCreatingProcess(tableRuntime, localEngine));
}

@Override
public void close() {}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.amoro.server.process.iceberg;

import org.apache.amoro.Action;
import org.apache.amoro.AmoroTable;
import org.apache.amoro.IcebergActions;
import org.apache.amoro.TableRuntime;
import org.apache.amoro.maintainer.TableMaintainer;
import org.apache.amoro.process.ExecuteEngine;
import org.apache.amoro.process.LocalProcess;
import org.apache.amoro.process.TableProcess;
import org.apache.amoro.server.optimizing.maintainer.TableMaintainerFactory;
import org.apache.amoro.shade.guava32.com.google.common.collect.Maps;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;

/** Local table process for auto-creating Iceberg tags. */
public class TagsAutoCreatingProcess extends TableProcess implements LocalProcess {

private static final Logger LOG = LoggerFactory.getLogger(TagsAutoCreatingProcess.class);

public TagsAutoCreatingProcess(TableRuntime tableRuntime, ExecuteEngine engine) {
super(tableRuntime, engine);
}

@Override
public String tag() {
return getAction().getName().toLowerCase();
}

@Override
public void run() {
try {
AmoroTable<?> amoroTable = tableRuntime.loadTable();
TableMaintainer tableMaintainer = TableMaintainerFactory.create(amoroTable, tableRuntime);
tableMaintainer.autoCreateTags();
} catch (Throwable t) {
LOG.error("Failed to create tags on {}", tableRuntime.getTableIdentifier(), t);
throw new RuntimeException(t);
}
}

@Override
public Action getAction() {
return IcebergActions.AUTO_CREATE_TAGS;
}

@Override
public Map<String, String> getProcessParameters() {
return Maps.newHashMap();
}

@Override
public Map<String, String> getSummary() {
return Maps.newHashMap();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ public class InlineTableExecutors {
private OptimizingCommitExecutor optimizingCommitExecutor;
private ProcessDataExpiringExecutor processDataExpiringExecutor;
private HiveCommitSyncExecutor hiveCommitSyncExecutor;
private TagsAutoCreatingExecutor tagsAutoCreatingExecutor;

public static InlineTableExecutors getInstance() {
return instance;
Expand Down Expand Up @@ -71,13 +70,6 @@ public void setup(TableService tableService, Configurations conf) {
conf.getInteger(AmoroManagementConf.REFRESH_TABLES_THREAD_COUNT),
conf.get(AmoroManagementConf.REFRESH_TABLES_INTERVAL).toMillis(),
conf.getInteger(AmoroManagementConf.REFRESH_MAX_PENDING_PARTITIONS));
if (conf.getBoolean(AmoroManagementConf.AUTO_CREATE_TAGS_ENABLED)) {
this.tagsAutoCreatingExecutor =
new TagsAutoCreatingExecutor(
tableService,
conf.getInteger(AmoroManagementConf.AUTO_CREATE_TAGS_THREAD_COUNT),
conf.get(AmoroManagementConf.AUTO_CREATE_TAGS_INTERVAL).toMillis());
}
}

public TableRuntimeRefreshExecutor getTableRefreshingExecutor() {
Expand All @@ -99,8 +91,4 @@ public ProcessDataExpiringExecutor getProcessDataExpiringExecutor() {
public HiveCommitSyncExecutor getHiveCommitSyncExecutor() {
return hiveCommitSyncExecutor;
}

public TagsAutoCreatingExecutor getTagsAutoCreatingExecutor() {
return tagsAutoCreatingExecutor;
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ public class TestAmoroManagementConf {
private static final ConfigOption<Duration>[] TIME_RELATED_CONFIG_OPTIONS =
new ConfigOption[] {
AmoroManagementConf.REFRESH_EXTERNAL_CATALOGS_INTERVAL,
AmoroManagementConf.AUTO_CREATE_TAGS_INTERVAL,
AmoroManagementConf.REFRESH_TABLES_INTERVAL,
AmoroManagementConf.BLOCKER_TIMEOUT,
AmoroManagementConf.OPTIMIZER_HB_TIMEOUT,
Expand All @@ -57,7 +56,6 @@ public class TestAmoroManagementConf {
private static final Map<String, String> DEFAULT_TIME_UNIT_IN_OLD_VERSIONS =
ImmutableMap.<String, String>builder()
.put(AmoroManagementConf.REFRESH_EXTERNAL_CATALOGS_INTERVAL.key(), "ms")
.put(AmoroManagementConf.AUTO_CREATE_TAGS_INTERVAL.key(), "ms")
.put(AmoroManagementConf.REFRESH_TABLES_INTERVAL.key(), "ms")
.put(AmoroManagementConf.BLOCKER_TIMEOUT.key(), "ms")
.put(AmoroManagementConf.OPTIMIZER_HB_TIMEOUT.key(), "ms")
Expand Down Expand Up @@ -228,7 +226,6 @@ private void assertStorageRelatedConfigs(
.put("optimizer.task-ack-timeout", "30000")
.put("optimizer.polling-timeout", "3000")
.put("blocker.timeout", "60000")
.put("auto-create-tags.interval", "60000")
.put("terminal.session.timeout", "1800000")
.build();

Expand All @@ -240,7 +237,6 @@ private void assertStorageRelatedConfigs(
.put("optimizer.task-ack-timeout", "60 s")
.put("optimizer.polling-timeout", "6 s")
.put("blocker.timeout", "2 min")
.put("auto-create-tags.interval", "2 min")
.put("terminal.session.timeout", "30 ms")
.build();

Expand All @@ -252,7 +248,6 @@ private void assertStorageRelatedConfigs(
.put("optimizer.task-ack-timeout", "60 s")
.put("optimizer.polling-timeout", "6 s")
.put("blocker.timeout", "2 min")
.put("auto-create-tags.interval", "2 min")
.put("terminal.session.timeout", "30 min")
.build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.amoro.TableRuntime;
import org.apache.amoro.config.DataExpirationConfig;
import org.apache.amoro.config.TableConfiguration;
import org.apache.amoro.config.TagConfiguration;
import org.apache.amoro.process.LocalExecutionEngine;
import org.apache.amoro.process.ProcessTriggerStrategy;
import org.apache.amoro.process.RecoverProcessFailedException;
Expand Down Expand Up @@ -67,6 +68,8 @@ public void testTriggerActionWhenDue() {
DanglingDeleteFilesCleaningProcess.class,
0);
assertTriggerWhenDue("expire-data", IcebergActions.EXPIRE_DATA, DataExpiringProcess.class, 0);
assertTriggerWhenDue(
"auto-create-tags", IcebergActions.AUTO_CREATE_TAGS, TagsAutoCreatingProcess.class, 0);
}

@Test
Expand All @@ -89,6 +92,7 @@ public void testTriggerActionDisabled() {
assertTriggerDisabled(
"clean-dangling-delete-files", IcebergActions.CLEAN_DANGLING_DELETE, false, 0);
assertTriggerDisabled("expire-data", IcebergActions.EXPIRE_DATA, false, 0);
assertTriggerDisabled("auto-create-tags", IcebergActions.AUTO_CREATE_TAGS, false, 0);
}

@Test
Expand Down Expand Up @@ -116,6 +120,12 @@ public void testRecoverDataExpiringProcess() {
assertRecover("expire-data", IcebergActions.EXPIRE_DATA, DataExpiringProcess.class);
}

@Test
public void testRecoverAutoCreateTagProcess() {
assertRecover(
"auto-create-tags", IcebergActions.AUTO_CREATE_TAGS, TagsAutoCreatingProcess.class);
}

@Test
public void testRecoverUnsupportedActionThrows() {
IcebergProcessFactory factory = openedFactory("expire-snapshots");
Expand Down Expand Up @@ -258,6 +268,10 @@ private TableRuntime createRuntime(String configKey, boolean enabled, long lastT
tableConfiguration.setDeleteDanglingDeleteFilesEnabled(enabled);
} else if ("expire-data".equals(configKey)) {
tableConfiguration.setExpiringDataConfig(new DataExpirationConfig().setEnabled(enabled));
} else if ("auto-create-tags".equals(configKey)) {
TagConfiguration tagConfiguration = new TagConfiguration();
tagConfiguration.setAutoCreateTag(enabled);
tableConfiguration.setTagConfiguration(tagConfiguration);
}

TableRuntimeCleanupState cleanupState = new TableRuntimeCleanupState();
Expand All @@ -274,7 +288,9 @@ private TableRuntime createRuntime(String configKey, boolean enabled, long lastT
TableRuntime runtime = mock(TableRuntime.class);
doReturn(tableConfiguration).when(runtime).getTableConfiguration();
doReturn(cleanupState).when(runtime).getState(DefaultTableRuntime.CLEANUP_STATE_KEY);

if ("auto-create-tags".equals(configKey)) {
doReturn(TableFormat.ICEBERG).when(runtime).getFormat();
}
return runtime;
}
}
Loading
Loading