diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java b/amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java index 9be63dbaf3..28ac43d742 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java @@ -163,9 +163,28 @@ private void loadOptimizingQueues(List tableRuntimeList) { optimizingQueueByGroup.put(groupName, optimizingQueue); }); optimizers.forEach(optimizer -> registerOptimizer(optimizer, false)); - groupToTableRuntimes - .keySet() - .forEach(groupName -> LOG.warn("Unloaded task runtime in group {}", groupName)); + // Avoid keeping the tables in processing/pending status forever in below cases: + // 1) Resource group does not exist + // 2) The AMS restarts after the tables disable self-optimizing but before the optimizing + // process is closed, which may cause the optimizing status of the tables to be still + // PLANNING/PENDING after AMS is restarted. + groupToTableRuntimes.forEach( + (groupName, trs) -> { + trs.stream() + .filter( + tr -> + tr.getOptimizingStatus() == OptimizingStatus.PLANNING + || tr.getOptimizingStatus() == OptimizingStatus.PENDING) + .forEach( + tr -> { + LOG.warn( + "Release {} optimizing process for table {}, since its resource group {} does not exist", + tr.getOptimizingStatus().name(), + tr.getTableIdentifier(), + groupName); + tr.completeEmptyProcess(); + }); + }); } private void registerOptimizer(OptimizerInstance optimizer, boolean needPersistent) { @@ -379,11 +398,22 @@ public void handleStatusChanged(TableRuntime tableRuntime, OptimizingStatus orig public void handleConfigChanged(TableRuntime runtime, TableConfiguration originalConfig) { DefaultTableRuntime tableRuntime = (DefaultTableRuntime) runtime; String originalGroup = originalConfig.getOptimizingConfig().getOptimizerGroup(); + Optional newQueue = getOptionalQueueByGroup(tableRuntime.getGroupName()); if (!tableRuntime.getGroupName().equals(originalGroup)) { getOptionalQueueByGroup(originalGroup).ifPresent(q -> q.releaseTable(tableRuntime)); + // If the new group doesn't exist, close the process to avoid the table in limbo(PENDING) + // status. + if (newQueue.isEmpty()) { + LOG.warn( + "Cannot find the resource group: {}, try to release optimizing process of table {} directly", + tableRuntime.getGroupName(), + tableRuntime.getTableIdentifier()); + tableRuntime.completeEmptyProcess(); + } } - getOptionalQueueByGroup(tableRuntime.getGroupName()) - .ifPresent(q -> q.refreshTable(tableRuntime)); + + // Binding new queue if the new group exists + newQueue.ifPresent(q -> q.refreshTable(tableRuntime)); } @Override diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/TestDefaultOptimizingService.java b/amoro-ams/src/test/java/org/apache/amoro/server/TestDefaultOptimizingService.java index 2116631de9..417150b390 100644 --- a/amoro-ams/src/test/java/org/apache/amoro/server/TestDefaultOptimizingService.java +++ b/amoro-ams/src/test/java/org/apache/amoro/server/TestDefaultOptimizingService.java @@ -18,9 +18,14 @@ package org.apache.amoro.server; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; + import org.apache.amoro.BasicTableTestHelper; import org.apache.amoro.OptimizerProperties; import org.apache.amoro.TableFormat; +import org.apache.amoro.TableRuntime; import org.apache.amoro.TableTestHelper; import org.apache.amoro.api.OptimizerRegisterInfo; import org.apache.amoro.api.OptimizingTask; @@ -28,12 +33,15 @@ import org.apache.amoro.api.OptimizingTaskResult; import org.apache.amoro.catalog.BasicCatalogTestHelper; import org.apache.amoro.catalog.CatalogTestHelper; +import org.apache.amoro.config.OptimizingConfig; +import org.apache.amoro.config.TableConfiguration; import org.apache.amoro.exception.IllegalTaskStateException; import org.apache.amoro.exception.PluginRetryAuthException; import org.apache.amoro.io.MixedDataTestHelpers; import org.apache.amoro.optimizing.RewriteFilesOutput; import org.apache.amoro.optimizing.TableOptimizing; import org.apache.amoro.process.ProcessStatus; +import org.apache.amoro.resource.ResourceGroup; import org.apache.amoro.server.optimizing.OptimizingStatus; import org.apache.amoro.server.optimizing.TaskRuntime; import org.apache.amoro.server.persistence.SqlSessionFactoryProvider; @@ -45,6 +53,7 @@ import org.apache.amoro.server.scheduler.inline.TableRuntimeRefreshExecutor; import org.apache.amoro.server.table.AMSTableTestBase; import org.apache.amoro.server.table.DefaultTableRuntime; +import org.apache.amoro.server.table.RuntimeHandlerChain; import org.apache.amoro.shade.guava32.com.google.common.collect.Lists; import org.apache.amoro.shade.guava32.com.google.common.collect.Maps; import org.apache.amoro.table.MixedTable; @@ -595,6 +604,79 @@ public void testReloadFailedTask() { assertTaskStatus(TaskRuntime.Status.PLANNED); } + /** + * Test handleConfigChanged when the optimizer group changes to a different existing group. The + * table should be released from the old group's queue and from the new group's queue. + */ + @Test + public void testHandleConfigChangedGroupChanged() { + // Create a new resource group + ResourceGroup newGroup = new ResourceGroup.Builder("test-new-group", "local").build(); + try { + optimizerManager().createResourceGroup(newGroup); + } catch (Throwable ignored) { + } + optimizingService().createResourceGroup(newGroup); + + try { + TableRuntime tableRuntime = tableService().getRuntime(serverTableIdentifier().getId()); + String originalGroup = tableRuntime.getGroupName(); + + // Build original config with the old group name + OptimizingConfig originalOptConfig = new OptimizingConfig(); + originalOptConfig.setOptimizerGroup(originalGroup); + TableConfiguration originalConfig = new TableConfiguration(); + originalConfig.setOptimizingConfig(originalOptConfig); + + // Simulate that the table now belongs to the new group + TableRuntime spyRuntime = spy(tableRuntime); + doReturn("test-new-group").when(spyRuntime).getGroupName(); + doReturn(TableFormat.ICEBERG).when(spyRuntime).getFormat(); + + // Fire config changed (group changed from "default" to "test-new-group") + RuntimeHandlerChain handler = optimizingService().getTableRuntimeHandler(); + handler.fireConfigChanged(spyRuntime, originalConfig); + + // No exception should be thrown; table should be released from both old and new queue + } finally { + optimizingService().deleteResourceGroup("test-new-group"); + try { + optimizerManager().deleteResourceGroup("test-new-group"); + } catch (Throwable ignored) { + } + } + } + + /** + * Test handleConfigChanged when the new optimizer group does not exist. The table runtime's + * completeEmptyProcess() should be called. + */ + @Test + public void testHandleConfigChangedGroupNotExist() { + DefaultTableRuntime tableRuntime = + (DefaultTableRuntime) tableService().getRuntime(serverTableIdentifier().getId()); + String originalGroup = tableRuntime.getGroupName(); + + // Build original config with the original group + OptimizingConfig originalOptConfig = new OptimizingConfig(); + originalOptConfig.setOptimizerGroup(originalGroup); + TableConfiguration originalConfig = new TableConfiguration(); + originalConfig.setOptimizingConfig(originalOptConfig); + + // Simulate that the table now belongs to a non-existing group + DefaultTableRuntime spyRuntime = spy(tableRuntime); + doReturn("non-existing-group").when(spyRuntime).getGroupName(); + doReturn(TableFormat.ICEBERG).when(spyRuntime).getFormat(); + doReturn(serverTableIdentifier()).when(spyRuntime).getTableIdentifier(); + + // Fire config changed (group changed from "default" to "non-existing-group") + RuntimeHandlerChain handler = optimizingService().getTableRuntimeHandler(); + handler.fireConfigChanged(spyRuntime, originalConfig); + + // Verify that completeEmptyProcess was called on the spy + verify(spyRuntime).completeEmptyProcess(); + } + private OptimizerRegisterInfo buildRegisterInfo() { OptimizerRegisterInfo registerInfo = new OptimizerRegisterInfo(); Map registerProperties = Maps.newHashMap(); diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/TestOptimizingQueue.java b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/TestOptimizingQueue.java index ddf8fe9852..807e4888b0 100644 --- a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/TestOptimizingQueue.java +++ b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/TestOptimizingQueue.java @@ -74,6 +74,7 @@ import java.util.Map; import java.util.concurrent.Executor; import java.util.concurrent.Executors; +import java.util.stream.Collectors; @RunWith(Parameterized.class) public class TestOptimizingQueue extends AMSTableTestBase { @@ -537,6 +538,48 @@ public void testAddAndRemoveOptimizers() { queue.dispose(); } + @Test + public void testReleaseOrphanedPlanningTableOnRestart() { + // Scenario: Table config was changed (optimizer group: "old_group" -> "default"), + // the optimizing process was closed but table runtime is persisted with "old_group" and + // PLANNING status + ResourceGroup oldGroup = new ResourceGroup.Builder("old_group", "local").build(); + // reset optimizer group to "default" to simulate the scenario where the self-optimizing configs + // have been cleared + DefaultTableRuntime tableRuntime = + buildTableRuntimeMeta(OptimizingStatus.PLANNING, defaultResourceGroup()); + Assert.assertEquals(OptimizingStatus.PLANNING, tableRuntime.getOptimizingStatus()); + Assert.assertEquals("default", tableRuntime.getGroupName()); + + List released = + simulateLoadOptimizingQueuesForNonExistentGroup( + Collections.singletonList(tableRuntime), oldGroup); + + Assert.assertEquals(1, released.size()); + Assert.assertEquals(OptimizingStatus.IDLE, tableRuntime.getOptimizingStatus()); + } + + @Test + public void testReleaseOrphanedPendingTableOnRestart() { + // Scenario: Table config was changed (optimizer group: "old_group" -> "default"), + // the optimizing process was closed but table runtime is persisted with "default" and PENDING + // status + ResourceGroup oldGroup = new ResourceGroup.Builder("old_group", "local").build(); + // reset optimizer group to "default" to simulate the scenario where the self-optimizing configs + // have been cleared + DefaultTableRuntime tableRuntime = + buildTableRuntimeMeta(OptimizingStatus.PENDING, defaultResourceGroup()); + Assert.assertEquals(OptimizingStatus.PENDING, tableRuntime.getOptimizingStatus()); + Assert.assertEquals("default", tableRuntime.getGroupName()); + + List released = + simulateLoadOptimizingQueuesForNonExistentGroup( + Collections.singletonList(tableRuntime), oldGroup); + + Assert.assertEquals(1, released.size()); + Assert.assertEquals(OptimizingStatus.IDLE, tableRuntime.getOptimizingStatus()); + } + protected DefaultTableRuntime initTableWithFiles() { MixedTable mixedTable = (MixedTable) tableService().loadTable(serverTableIdentifier()).originalTable(); @@ -646,6 +689,41 @@ private OptimizingTaskResult buildOptimizingTaskResult(OptimizingTaskId taskId, return optimizingTaskResult; } + /** + * Simulate the loadOptimizingQueues logic: tables whose persisted optimizer group no longer + * exists (e.g., table config changed from an old deleted group to "default", but AMS restarted + * before the optimizing process was closed) remain in the leftover groupToTableRuntimes map. + * These PLANNING/PENDING tables should be released to IDLE via completeEmptyProcess(). + */ + private List simulateLoadOptimizingQueuesForNonExistentGroup( + List tableRuntimes, ResourceGroup resourceGroup) { + // Only the created resource group is returned + List existingGroups = Collections.singletonList(resourceGroup); + + // Group tables by their persisted optimizer group + Map> groupToTableRuntimes = + tableRuntimes.stream().collect(Collectors.groupingBy(DefaultTableRuntime::getGroupName)); + + // Remove groups that exist — same logic as loadOptimizingQueues + existingGroups.forEach(group -> groupToTableRuntimes.remove(group.getName())); + + // Release PLANNING/PENDING tables in non-existent groups — same logic as loadOptimizingQueues + List released = new ArrayList<>(); + groupToTableRuntimes.forEach( + (groupName, trs) -> + trs.stream() + .filter( + tr -> + tr.getOptimizingStatus() == OptimizingStatus.PLANNING + || tr.getOptimizingStatus() == OptimizingStatus.PENDING) + .forEach( + tr -> { + tr.completeEmptyProcess(); + released.add(tr); + })); + return released; + } + private OptimizingTaskResult buildOptimizingTaskFailed(OptimizingTaskId taskId, int threadId) { OptimizingTaskResult optimizingTaskResult = new OptimizingTaskResult(taskId, threadId); optimizingTaskResult.setErrorMessage("error");