Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -156,18 +156,6 @@ public class AmoroManagementConf {
.defaultValue(1000000)
.withDescription("The queue size of the executors of the external catalog explorer.");

public static final ConfigOption<Boolean> SYNC_HIVE_TABLES_ENABLED =
ConfigOptions.key("sync-hive-tables.enabled")
.booleanType()
.defaultValue(false)
.withDescription("Enable synchronizing Hive tables.");

public static final ConfigOption<Integer> SYNC_HIVE_TABLES_THREAD_COUNT =
ConfigOptions.key("sync-hive-tables.thread-count")
.intType()
.defaultValue(10)
.withDescription("The number of threads used for synchronizing Hive tables.");

public static final ConfigOption<Integer> REFRESH_TABLES_THREAD_COUNT =
ConfigOptions.key("refresh-tables.thread-count")
.intType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,6 @@ public static void validateConfig(Configurations configurations) {

validateThreadCount(configurations, AmoroManagementConf.REFRESH_TABLES_THREAD_COUNT);
validateThreadCount(configurations, AmoroManagementConf.OPTIMIZING_COMMIT_THREAD_COUNT);

if (configurations.getBoolean(AmoroManagementConf.SYNC_HIVE_TABLES_ENABLED)) {
validateThreadCount(configurations, AmoroManagementConf.SYNC_HIVE_TABLES_THREAD_COUNT);
}
}

private static void validateThreadCount(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,6 @@ public void startOptimizingService() throws Exception {
addHandlerChain(InlineTableExecutors.getInstance().getOptimizingCommitExecutor());
addHandlerChain(InlineTableExecutors.getInstance().getProcessDataExpiringExecutor());
addHandlerChain(InlineTableExecutors.getInstance().getBlockerExpiringExecutor());
addHandlerChain(InlineTableExecutors.getInstance().getHiveCommitSyncExecutor());
addHandlerChain(InlineTableExecutors.getInstance().getTableRefreshingExecutor());
tableService.initialize();
LOG.info("AMS table service have been initialized");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,69 +16,75 @@
* limitations under the License.
*/

package org.apache.amoro.server.scheduler.inline;
package org.apache.amoro.server.process.iceberg;

import org.apache.amoro.Action;
import org.apache.amoro.AmoroTable;
import org.apache.amoro.IcebergActions;
import org.apache.amoro.ServerTableIdentifier;
import org.apache.amoro.TableRuntime;
import org.apache.amoro.hive.table.SupportHive;
import org.apache.amoro.hive.utils.HiveMetaSynchronizer;
import org.apache.amoro.hive.utils.TableTypeUtil;
import org.apache.amoro.server.scheduler.PeriodicTableScheduler;
import org.apache.amoro.server.table.TableService;
import org.apache.amoro.process.ExecuteEngine;
import org.apache.amoro.process.LocalProcess;
import org.apache.amoro.process.TableProcess;
import org.apache.amoro.shade.guava32.com.google.common.collect.Maps;
import org.apache.amoro.table.MixedTable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.ThreadLocalRandom;
import java.util.Map;

public class HiveCommitSyncExecutor extends PeriodicTableScheduler {
private static final Logger LOG = LoggerFactory.getLogger(HiveCommitSyncExecutor.class);
/** Local table process for syncing Iceberg metadata to Hive. */
public class HiveCommitSyncProcess extends TableProcess implements LocalProcess {

// 10 minutes
private static final long INTERVAL = 10 * 60 * 1000L;
private static final Logger LOG = LoggerFactory.getLogger(HiveCommitSyncProcess.class);

public HiveCommitSyncExecutor(TableService tableService, int poolSize) {
super(tableService, poolSize);
}

@Override
protected long getNextExecutingTime(TableRuntime tableRuntime) {
return INTERVAL;
public static void syncIcebergToHive(MixedTable mixedTable) {
HiveMetaSynchronizer.syncMixedTableDataToHive((SupportHive) mixedTable);
}

@Override
protected boolean enabled(TableRuntime tableRuntime) {
return true;
public HiveCommitSyncProcess(TableRuntime tableRuntime, ExecuteEngine engine) {
super(tableRuntime, engine);
}

@Override
protected long getExecutorDelay() {
return ThreadLocalRandom.current().nextLong(INTERVAL);
public String tag() {
return getAction().getName().toLowerCase();
}

@Override
protected void execute(TableRuntime tableRuntime) {
long startTime = System.currentTimeMillis();
public void run() {
ServerTableIdentifier tableIdentifier = tableRuntime.getTableIdentifier();
try {
MixedTable mixedTable = (MixedTable) loadTable(tableRuntime).originalTable();
AmoroTable<?> amoroTable = tableRuntime.loadTable();
MixedTable mixedTable = (MixedTable) amoroTable.originalTable();
if (!TableTypeUtil.isHive(mixedTable)) {
LOG.debug("{} is not a support hive table", tableIdentifier);
return;
}

LOG.info("{} start hive sync", tableIdentifier);
syncIcebergToHive(mixedTable);
} catch (Exception e) {
LOG.error("{} hive sync failed", tableIdentifier, e);
} finally {
LOG.info(
"{} hive sync finished, cost {}ms",
tableIdentifier,
System.currentTimeMillis() - startTime);
throw new RuntimeException(e);
}
}

public static void syncIcebergToHive(MixedTable mixedTable) {
HiveMetaSynchronizer.syncMixedTableDataToHive((SupportHive) mixedTable);
@Override
public Action getAction() {
return IcebergActions.SYNC_HIVE_TABLES;
}

@Override
public Map<String, String> getProcessParameters() {
return Maps.newHashMap();
}

@Override
public Map<String, String> getSummary() {
return Maps.newHashMap();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,14 @@ public class IcebergProcessFactory implements ProcessFactory {
.durationType()
.defaultValue(Duration.ofMinutes(1));

public static final ConfigOption<Boolean> SYNC_HIVE_TABLES_ENABLED =
ConfigOptions.key("sync-hive-tables.enabled").booleanType().defaultValue(false);

public static final ConfigOption<Duration> SYNC_HIVE_TABLES_INTERVAL =
ConfigOptions.key("sync-hive-tables.interval")
.durationType()
.defaultValue(Duration.ofMinutes(10));

private ExecuteEngine localEngine;
private final Map<Action, ProcessTriggerStrategy> actions = Maps.newHashMap();
private final List<TableFormat> formats =
Expand Down Expand Up @@ -129,6 +137,8 @@ public Optional<TableProcess> trigger(TableRuntime tableRuntime, Action action)
return triggerDataExpiring(tableRuntime);
} else if (IcebergActions.AUTO_CREATE_TAGS.equals(action)) {
return triggerAutoCreateTag(tableRuntime);
} else if (IcebergActions.SYNC_HIVE_TABLES.equals(action)) {
return triggerHiveCommitSync(tableRuntime);
}

return Optional.empty();
Expand Down Expand Up @@ -158,6 +168,8 @@ public TableProcess recover(TableRuntime tableRuntime, TableProcessStore store)
return new DataExpiringProcess(tableRuntime, localEngine);
} else if (IcebergActions.AUTO_CREATE_TAGS.equals(action)) {
return new TagsAutoCreatingProcess(tableRuntime, localEngine);
} else if (IcebergActions.SYNC_HIVE_TABLES.equals(action)) {
return new HiveCommitSyncProcess(tableRuntime, localEngine);
}

throw new RecoverProcessFailedException(
Expand Down Expand Up @@ -199,6 +211,12 @@ public void open(Map<String, String> properties) {
this.actions.put(
IcebergActions.AUTO_CREATE_TAGS, ProcessTriggerStrategy.triggerAtFixRate(interval));
}

if (configs.getBoolean(SYNC_HIVE_TABLES_ENABLED)) {
Duration interval = configs.getDuration(SYNC_HIVE_TABLES_INTERVAL);
this.actions.put(
IcebergActions.SYNC_HIVE_TABLES, ProcessTriggerStrategy.triggerAtFixRate(interval));
}
}

private Optional<TableProcess> triggerExpireSnapshot(TableRuntime tableRuntime) {
Expand Down Expand Up @@ -275,6 +293,14 @@ private Optional<TableProcess> triggerAutoCreateTag(TableRuntime tableRuntime) {
return Optional.of(new TagsAutoCreatingProcess(tableRuntime, localEngine));
}

private Optional<TableProcess> triggerHiveCommitSync(TableRuntime tableRuntime) {
if (localEngine == null) {
return Optional.empty();
}

return Optional.of(new HiveCommitSyncProcess(tableRuntime, localEngine));
}

@Override
public void close() {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ public class InlineTableExecutors {
private BlockerExpiringExecutor blockerExpiringExecutor;
private OptimizingCommitExecutor optimizingCommitExecutor;
private ProcessDataExpiringExecutor processDataExpiringExecutor;
private HiveCommitSyncExecutor hiveCommitSyncExecutor;

public static InlineTableExecutors getInstance() {
return instance;
Expand Down Expand Up @@ -59,11 +58,6 @@ public void setup(TableService tableService, Configurations conf) {
new ProcessDataExpiringExecutor(
tableService, optimizingKeepTime, expireInterval, processKeepTime);
this.blockerExpiringExecutor = new BlockerExpiringExecutor(tableService);
if (conf.getBoolean(AmoroManagementConf.SYNC_HIVE_TABLES_ENABLED)) {
this.hiveCommitSyncExecutor =
new HiveCommitSyncExecutor(
tableService, conf.getInteger(AmoroManagementConf.SYNC_HIVE_TABLES_THREAD_COUNT));
}
this.tableRefreshingExecutor =
new TableRuntimeRefreshExecutor(
tableService,
Expand All @@ -87,8 +81,4 @@ public OptimizingCommitExecutor getOptimizingCommitExecutor() {
public ProcessDataExpiringExecutor getProcessDataExpiringExecutor() {
return processDataExpiringExecutor;
}

public HiveCommitSyncExecutor getHiveCommitSyncExecutor() {
return hiveCommitSyncExecutor;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,6 @@ private String getAmsConfig() {
+ " refresh-table-thread-count: 10\n"
+ " refresh-table-interval: 60000 #1min\n"
+ " expire-table-thread-count: 10\n"
+ " sync-hive-tables-thread-count: 10\n"
+ "\n"
+ " thrift-server:\n"
+ " max-message-size: 104857600 # 100MB\n"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,13 +126,5 @@ public void testValidateThreadCount() {
() -> AmoroManagementConfValidator.validateConfig(configurations));
configurations.setInteger(AmoroManagementConf.OPTIMIZING_COMMIT_THREAD_COUNT, 10);
AmoroManagementConfValidator.validateConfig(configurations);

configurations.setBoolean(AmoroManagementConf.SYNC_HIVE_TABLES_ENABLED, true);
configurations.setInteger(AmoroManagementConf.SYNC_HIVE_TABLES_THREAD_COUNT, -1);
Assert.assertThrows(
IllegalArgumentException.class,
() -> AmoroManagementConfValidator.validateConfig(configurations));
configurations.setInteger(AmoroManagementConf.SYNC_HIVE_TABLES_THREAD_COUNT, 10);
AmoroManagementConfValidator.validateConfig(configurations);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ public void testOpenAndSupportedActions() {
assertSupportedAction(
"clean-dangling-delete-files", IcebergActions.CLEAN_DANGLING_DELETE, Duration.ofHours(24));
assertSupportedAction("expire-data", IcebergActions.EXPIRE_DATA, Duration.ofHours(24));
assertSupportedAction("sync-hive-tables", IcebergActions.SYNC_HIVE_TABLES, Duration.ofHours(1));
}

@Test
Expand All @@ -70,6 +71,8 @@ public void testTriggerActionWhenDue() {
assertTriggerWhenDue("expire-data", IcebergActions.EXPIRE_DATA, DataExpiringProcess.class, 0);
assertTriggerWhenDue(
"auto-create-tags", IcebergActions.AUTO_CREATE_TAGS, TagsAutoCreatingProcess.class, 0);
assertTriggerWhenDue(
"sync-hive-tables", IcebergActions.SYNC_HIVE_TABLES, HiveCommitSyncProcess.class, 0);
}

@Test
Expand Down Expand Up @@ -126,6 +129,11 @@ public void testRecoverAutoCreateTagProcess() {
"auto-create-tags", IcebergActions.AUTO_CREATE_TAGS, TagsAutoCreatingProcess.class);
}

@Test
public void testRecoverSyncHiveProcess() {
assertRecover("sync-hive-tables", IcebergActions.SYNC_HIVE_TABLES, HiveCommitSyncProcess.class);
}

@Test
public void testRecoverUnsupportedActionThrows() {
IcebergProcessFactory factory = openedFactory("expire-snapshots");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public class IcebergActions {
public static final Action SYSTEM = Action.register("system");
public static final Action REWRITE = Action.register("rewrite");
public static final Action CLEAN_ORPHAN = Action.register("clean-orphan-files");
public static final Action SYNC_HIVE = Action.register("sync-hive");
public static final Action SYNC_HIVE_TABLES = Action.register("sync-hive-tables");
public static final Action EXPIRE_DATA = Action.register("expire-data");
public static final Action EXPIRE_SNAPSHOTS = Action.register("expire-snapshots");
public static final Action CLEAN_DANGLING_DELETE = Action.register("clean-dangling-delete-files");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ public void testSubmitUsesCustomPoolByTag() throws Exception {
assertCustomPoolByTag("clean-dangling-delete-files");
assertCustomPoolByTag("expire-data");
assertCustomPoolByTag("auto-create-tags");
assertCustomPoolByTag("sync-hive-tables");
}

private void assertCustomPoolByTag(String tag) throws Exception {
Expand Down Expand Up @@ -160,6 +161,7 @@ private LocalExecutionEngine createEngineWithTtl(String ttl) {
properties.put("pool.clean-dangling-delete-files.thread-count", "1");
properties.put("pool.expire-data.thread-count", "1");
properties.put("pool.auto-create-tags.thread-count", "1");
properties.put("pool.sync-hive-tables.thread-count", "1");
properties.put("process.status.ttl", ttl);
localEngine.open(properties);
return localEngine;
Expand Down
4 changes: 0 additions & 4 deletions charts/amoro/templates/amoro-configmap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,6 @@ data:
timeout: 1min # 60000

# optional features
sync-hive-tables:
enabled: false
thread-count: 10

table-manifest-io:
thread-count: 20

Expand Down
4 changes: 0 additions & 4 deletions dist/src/main/amoro-bin/conf/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -108,10 +108,6 @@ ams:
~

# optional features
sync-hive-tables:
enabled: false
thread-count: 10

table-manifest-io:
thread-count: 20

Expand Down
3 changes: 2 additions & 1 deletion dist/src/main/amoro-bin/conf/plugins/execute-engines.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,5 @@ execute-engines:
pool.clean-orphan-files.thread-count: 10
pool.clean-dangling-delete-files.thread-count: 10
pool.expire-data.thread-count: 10
pool.auto-create-tags.thread-count: 3
pool.auto-create-tags.thread-count: 3
pool.sync-hive-tables.thread-count: 10
2 changes: 2 additions & 0 deletions dist/src/main/amoro-bin/conf/plugins/process-factories.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,5 @@ process-factories:
expire-data.interval: "1d"
auto-create-tags.enabled: "true"
auto-create-tags.interval: "1min"
sync-hive-tables.enabled: "false"
sync-hive-tables.interval: "10min"
9 changes: 6 additions & 3 deletions docs/admin-guides/deployment.md
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,8 @@ process-factories:
expire-data.interval: "1d" # interval for data expiration
auto-create-tags.enabled: "true" # enable auto creating tags
auto-create-tags.interval: "1m" # interval for auto creating tags
sync-hive-tables.enabled: "false" # enable synchronizing Hive tables
sync-hive-tables.interval: "10min" # interval for synchronizing Hive tables
```

{{< hint info >}}
Expand Down Expand Up @@ -308,11 +310,12 @@ execute-engines:
priority: 100
properties:
pool.default.thread-count: 10 # default thread pool size
pool.expire-snapshots.thread-count: 10 # thread pool for snapshot expiration
pool.clean-orphan-files.thread-count: 10 # thread pool for orphan file cleaning
pool.expire-snapshots.thread-count: 10 # thread pool for snapshot expiration
pool.clean-orphan-files.thread-count: 10 # thread pool for orphan file cleaning
pool.clean-dangling-delete-files.thread-count: 10 # thread pool for dangling delete files cleaning
pool.expire-data.thread-count: 10 # thread pool for data expiration
pool.auto-create-tags.thread-count: 3 # thread pool for auto creating tags
pool.auto-create-tags.thread-count: 3 # thread pool for auto creating tags
pool.sync-hive-tables.thread-count: 10 # thread pool for synchronizing Hive tables
process.status.ttl: 4h # TTL for process status cache
```

Expand Down
2 changes: 0 additions & 2 deletions docs/configuration/ams-config.md
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,6 @@ table td:last-child, table th:last-child { width: 40%; word-break: break-all; }
| self-optimizing.runtime-data-keep-time | 30 d | Duration that self-optimizing runtime data is retained. |
| server-bind-host | 0.0.0.0 | The host bound to the server. |
| server-expose-host | | The exposed host of the server. |
| sync-hive-tables.enabled | false | Enable synchronizing Hive tables. |
| sync-hive-tables.thread-count | 10 | The number of threads used for synchronizing Hive tables. |
| table-manifest-io.thread-count | 20 | Sets the size of the worker pool. The worker pool limits the number of tasks concurrently processing manifests in the base table implementation across all concurrent planning or commit operations. |
| terminal.backend | local | Terminal backend implementation. local, kyuubi and custom are valid values. |
| terminal.factory | &lt;undefined&gt; | Session factory implement of terminal, `terminal.backend` must be `custom` if this is set. |
Expand Down
Loading