Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -163,9 +163,28 @@ private void loadOptimizingQueues(List<DefaultTableRuntime> tableRuntimeList) {
optimizingQueueByGroup.put(groupName, optimizingQueue);
});
optimizers.forEach(optimizer -> registerOptimizer(optimizer, false));
groupToTableRuntimes
.keySet()
.forEach(groupName -> LOG.warn("Unloaded task runtime in group {}", groupName));
// Avoid keeping the tables in processing/pending status forever in below cases:
// 1) Resource group does not exist
// 2) The AMS restarts after the tables disable self-optimizing but before the optimizing
// process is closed, which may cause the optimizing status of the tables to be still
// PLANNING/PENDING after AMS is restarted.
groupToTableRuntimes.forEach(
(groupName, trs) -> {
trs.stream()
.filter(
tr ->
tr.getOptimizingStatus() == OptimizingStatus.PLANNING
|| tr.getOptimizingStatus() == OptimizingStatus.PENDING)
.forEach(
tr -> {
LOG.warn(
"Release {} optimizing process for table {}, since its resource group {} does not exist",
tr.getOptimizingStatus().name(),
tr.getTableIdentifier(),
groupName);
tr.completeEmptyProcess();
});
});
}

private void registerOptimizer(OptimizerInstance optimizer, boolean needPersistent) {
Expand Down Expand Up @@ -379,11 +398,22 @@ public void handleStatusChanged(TableRuntime tableRuntime, OptimizingStatus orig
public void handleConfigChanged(TableRuntime runtime, TableConfiguration originalConfig) {
DefaultTableRuntime tableRuntime = (DefaultTableRuntime) runtime;
String originalGroup = originalConfig.getOptimizingConfig().getOptimizerGroup();
Optional<OptimizingQueue> newQueue = getOptionalQueueByGroup(tableRuntime.getGroupName());
if (!tableRuntime.getGroupName().equals(originalGroup)) {
getOptionalQueueByGroup(originalGroup).ifPresent(q -> q.releaseTable(tableRuntime));
// If the new group doesn't exist, close the process to avoid the table in limbo(PENDING)
// status.
if (newQueue.isEmpty()) {
LOG.warn(
"Cannot find the resource group: {}, try to release optimizing process of table {} directly",
tableRuntime.getGroupName(),
tableRuntime.getTableIdentifier());
tableRuntime.completeEmptyProcess();
}
}
getOptionalQueueByGroup(tableRuntime.getGroupName())
.ifPresent(q -> q.refreshTable(tableRuntime));

// Binding new queue if the new group exists
newQueue.ifPresent(q -> q.refreshTable(tableRuntime));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,30 @@

package org.apache.amoro.server;

import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;

import org.apache.amoro.BasicTableTestHelper;
import org.apache.amoro.OptimizerProperties;
import org.apache.amoro.TableFormat;
import org.apache.amoro.TableRuntime;
import org.apache.amoro.TableTestHelper;
import org.apache.amoro.api.OptimizerRegisterInfo;
import org.apache.amoro.api.OptimizingTask;
import org.apache.amoro.api.OptimizingTaskId;
import org.apache.amoro.api.OptimizingTaskResult;
import org.apache.amoro.catalog.BasicCatalogTestHelper;
import org.apache.amoro.catalog.CatalogTestHelper;
import org.apache.amoro.config.OptimizingConfig;
import org.apache.amoro.config.TableConfiguration;
import org.apache.amoro.exception.IllegalTaskStateException;
import org.apache.amoro.exception.PluginRetryAuthException;
import org.apache.amoro.io.MixedDataTestHelpers;
import org.apache.amoro.optimizing.RewriteFilesOutput;
import org.apache.amoro.optimizing.TableOptimizing;
import org.apache.amoro.process.ProcessStatus;
import org.apache.amoro.resource.ResourceGroup;
import org.apache.amoro.server.optimizing.OptimizingStatus;
import org.apache.amoro.server.optimizing.TaskRuntime;
import org.apache.amoro.server.persistence.SqlSessionFactoryProvider;
Expand All @@ -45,6 +53,7 @@
import org.apache.amoro.server.scheduler.inline.TableRuntimeRefreshExecutor;
import org.apache.amoro.server.table.AMSTableTestBase;
import org.apache.amoro.server.table.DefaultTableRuntime;
import org.apache.amoro.server.table.RuntimeHandlerChain;
import org.apache.amoro.shade.guava32.com.google.common.collect.Lists;
import org.apache.amoro.shade.guava32.com.google.common.collect.Maps;
import org.apache.amoro.table.MixedTable;
Expand Down Expand Up @@ -595,6 +604,79 @@ public void testReloadFailedTask() {
assertTaskStatus(TaskRuntime.Status.PLANNED);
}

/**
* Test handleConfigChanged when the optimizer group changes to a different existing group. The
* table should be released from the old group's queue and from the new group's queue.
*/
@Test
public void testHandleConfigChangedGroupChanged() {
// Create a new resource group
ResourceGroup newGroup = new ResourceGroup.Builder("test-new-group", "local").build();
try {
optimizerManager().createResourceGroup(newGroup);
} catch (Throwable ignored) {
}
optimizingService().createResourceGroup(newGroup);

try {
TableRuntime tableRuntime = tableService().getRuntime(serverTableIdentifier().getId());
String originalGroup = tableRuntime.getGroupName();

// Build original config with the old group name
OptimizingConfig originalOptConfig = new OptimizingConfig();
originalOptConfig.setOptimizerGroup(originalGroup);
TableConfiguration originalConfig = new TableConfiguration();
originalConfig.setOptimizingConfig(originalOptConfig);

// Simulate that the table now belongs to the new group
TableRuntime spyRuntime = spy(tableRuntime);
doReturn("test-new-group").when(spyRuntime).getGroupName();
doReturn(TableFormat.ICEBERG).when(spyRuntime).getFormat();

// Fire config changed (group changed from "default" to "test-new-group")
RuntimeHandlerChain handler = optimizingService().getTableRuntimeHandler();
handler.fireConfigChanged(spyRuntime, originalConfig);

// No exception should be thrown; table should be released from both old and new queue
} finally {
optimizingService().deleteResourceGroup("test-new-group");
try {
optimizerManager().deleteResourceGroup("test-new-group");
} catch (Throwable ignored) {
}
}
}

/**
* Test handleConfigChanged when the new optimizer group does not exist. The table runtime's
* completeEmptyProcess() should be called.
*/
@Test
public void testHandleConfigChangedGroupNotExist() {
DefaultTableRuntime tableRuntime =
(DefaultTableRuntime) tableService().getRuntime(serverTableIdentifier().getId());
String originalGroup = tableRuntime.getGroupName();

// Build original config with the original group
OptimizingConfig originalOptConfig = new OptimizingConfig();
originalOptConfig.setOptimizerGroup(originalGroup);
TableConfiguration originalConfig = new TableConfiguration();
originalConfig.setOptimizingConfig(originalOptConfig);

// Simulate that the table now belongs to a non-existing group
DefaultTableRuntime spyRuntime = spy(tableRuntime);
doReturn("non-existing-group").when(spyRuntime).getGroupName();
doReturn(TableFormat.ICEBERG).when(spyRuntime).getFormat();
doReturn(serverTableIdentifier()).when(spyRuntime).getTableIdentifier();

// Fire config changed (group changed from "default" to "non-existing-group")
RuntimeHandlerChain handler = optimizingService().getTableRuntimeHandler();
handler.fireConfigChanged(spyRuntime, originalConfig);

// Verify that completeEmptyProcess was called on the spy
verify(spyRuntime).completeEmptyProcess();
}

private OptimizerRegisterInfo buildRegisterInfo() {
OptimizerRegisterInfo registerInfo = new OptimizerRegisterInfo();
Map<String, String> registerProperties = Maps.newHashMap();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;

@RunWith(Parameterized.class)
public class TestOptimizingQueue extends AMSTableTestBase {
Expand Down Expand Up @@ -537,6 +538,48 @@ public void testAddAndRemoveOptimizers() {
queue.dispose();
}

@Test
public void testReleaseOrphanedPlanningTableOnRestart() {
// Scenario: Table config was changed (optimizer group: "old_group" -> "default"),
// the optimizing process was closed but table runtime is persisted with "old_group" and
// PLANNING status
ResourceGroup oldGroup = new ResourceGroup.Builder("old_group", "local").build();
// reset optimizer group to "default" to simulate the scenario where the self-optimizing configs
// have been cleared
DefaultTableRuntime tableRuntime =
buildTableRuntimeMeta(OptimizingStatus.PLANNING, defaultResourceGroup());
Assert.assertEquals(OptimizingStatus.PLANNING, tableRuntime.getOptimizingStatus());
Assert.assertEquals("default", tableRuntime.getGroupName());

List<DefaultTableRuntime> released =
simulateLoadOptimizingQueuesForNonExistentGroup(
Collections.singletonList(tableRuntime), oldGroup);

Assert.assertEquals(1, released.size());
Assert.assertEquals(OptimizingStatus.IDLE, tableRuntime.getOptimizingStatus());
}

@Test
public void testReleaseOrphanedPendingTableOnRestart() {
// Scenario: Table config was changed (optimizer group: "old_group" -> "default"),
// the optimizing process was closed but table runtime is persisted with "default" and PENDING
// status
ResourceGroup oldGroup = new ResourceGroup.Builder("old_group", "local").build();
// reset optimizer group to "default" to simulate the scenario where the self-optimizing configs
// have been cleared
DefaultTableRuntime tableRuntime =
buildTableRuntimeMeta(OptimizingStatus.PENDING, defaultResourceGroup());
Assert.assertEquals(OptimizingStatus.PENDING, tableRuntime.getOptimizingStatus());
Assert.assertEquals("default", tableRuntime.getGroupName());

List<DefaultTableRuntime> released =
simulateLoadOptimizingQueuesForNonExistentGroup(
Collections.singletonList(tableRuntime), oldGroup);

Assert.assertEquals(1, released.size());
Assert.assertEquals(OptimizingStatus.IDLE, tableRuntime.getOptimizingStatus());
}

protected DefaultTableRuntime initTableWithFiles() {
MixedTable mixedTable =
(MixedTable) tableService().loadTable(serverTableIdentifier()).originalTable();
Expand Down Expand Up @@ -646,6 +689,41 @@ private OptimizingTaskResult buildOptimizingTaskResult(OptimizingTaskId taskId,
return optimizingTaskResult;
}

/**
* Simulate the loadOptimizingQueues logic: tables whose persisted optimizer group no longer
* exists (e.g., table config changed from an old deleted group to "default", but AMS restarted
* before the optimizing process was closed) remain in the leftover groupToTableRuntimes map.
* These PLANNING/PENDING tables should be released to IDLE via completeEmptyProcess().
*/
private List<DefaultTableRuntime> simulateLoadOptimizingQueuesForNonExistentGroup(
List<DefaultTableRuntime> tableRuntimes, ResourceGroup resourceGroup) {
// Only the created resource group is returned
List<ResourceGroup> existingGroups = Collections.singletonList(resourceGroup);

// Group tables by their persisted optimizer group
Map<String, List<DefaultTableRuntime>> groupToTableRuntimes =
tableRuntimes.stream().collect(Collectors.groupingBy(DefaultTableRuntime::getGroupName));

// Remove groups that exist — same logic as loadOptimizingQueues
existingGroups.forEach(group -> groupToTableRuntimes.remove(group.getName()));

// Release PLANNING/PENDING tables in non-existent groups — same logic as loadOptimizingQueues
List<DefaultTableRuntime> released = new ArrayList<>();
groupToTableRuntimes.forEach(
(groupName, trs) ->
trs.stream()
.filter(
tr ->
tr.getOptimizingStatus() == OptimizingStatus.PLANNING
|| tr.getOptimizingStatus() == OptimizingStatus.PENDING)
.forEach(
tr -> {
tr.completeEmptyProcess();
released.add(tr);
}));
return released;
}

private OptimizingTaskResult buildOptimizingTaskFailed(OptimizingTaskId taskId, int threadId) {
OptimizingTaskResult optimizingTaskResult = new OptimizingTaskResult(taskId, threadId);
optimizingTaskResult.setErrorMessage("error");
Expand Down