diff --git a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestReconTaskControllerImpl.java b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestReconTaskControllerImpl.java index c2636701bf3..1668a1e9fb3 100644 --- a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestReconTaskControllerImpl.java +++ b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestReconTaskControllerImpl.java @@ -58,6 +58,7 @@ import org.apache.ozone.recon.schema.generated.tables.daos.ReconTaskStatusDao; import org.apache.ozone.recon.schema.generated.tables.pojos.ReconTaskStatus; import org.apache.ozone.test.GenericTestUtils; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -96,6 +97,13 @@ public void setUp() throws IOException { reconTaskController.start(); } + @AfterEach + public void tearDown() { + if (reconTaskController != null) { + reconTaskController.stop(); + } + } + @Test public void testStopCompletesPromptly() { // stop() must not block on the graceful shutdown timeout. The event @@ -122,15 +130,10 @@ public void testRegisterTask() { @Test public void testConsumeOMEvents() throws Exception { - // Use CountDownLatch to wait for async processing - CountDownLatch taskCompletionLatch = new CountDownLatch(1); - ReconOmTask reconOmTaskMock = getMockTask("MockTask"); when(reconOmTaskMock.process(any(OMUpdateEventBatch.class), anyMap())) - .thenAnswer(invocation -> { - taskCompletionLatch.countDown(); // Signal task completion - return new ReconOmTask.TaskResult.Builder().setTaskName("MockTask").setTaskSuccess(true).build(); - }); + .thenReturn(new ReconOmTask.TaskResult.Builder() + .setTaskName("MockTask").setTaskSuccess(true).build()); reconTaskController.registerTask(reconOmTaskMock); OMUpdateEventBatch omUpdateEventBatchMock = mock(OMUpdateEventBatch.class); @@ -145,10 +148,17 @@ public void testConsumeOMEvents() throws Exception { omUpdateEventBatchMock, mock(OMMetadataManager.class)); - // Wait for async processing to complete using latch - boolean completed = taskCompletionLatch.await(10, TimeUnit.SECONDS); - assertThat(completed).isTrue(); - + GenericTestUtils.waitFor(() -> { + try { + ReconTaskStatus status = reconTaskStatusDao.findById("MockTask"); + return status != null + && status.getLastTaskRunStatus() == 0 + && status.getLastUpdatedSeqNumber() == 100L; + } catch (Exception e) { + return false; + } + }, 100, 5000); + verify(reconOmTaskMock, times(1)) .process(any(), anyMap()); long endTime = System.currentTimeMillis(); @@ -236,9 +246,17 @@ public void testFailedTaskRetryLogic() throws Exception { reconTaskController.consumeOMEvents(omUpdateEventBatchMock, mock(OMMetadataManager.class)); - // Wait for async processing to complete - Thread.sleep(3000); // Increase timeout for retry logic - + GenericTestUtils.waitFor(() -> { + try { + ReconTaskStatus status = reconTaskStatusDao.findById(taskName); + return status != null + && status.getLastTaskRunStatus() == 0 + && status.getLastUpdatedSeqNumber() == 100L; + } catch (Exception e) { + return false; + } + }, 100, 5000); + assertThat(reconTaskController.getRegisteredTasks()).isNotEmpty(); assertEquals(dummyReconDBTask, reconTaskController.getRegisteredTasks() .get(dummyReconDBTask.getTaskName())); @@ -350,6 +368,7 @@ public void testQueueReInitializationEventSuccess() throws Exception { when(mockCheckpoint.getCheckpointLocation()).thenReturn(mockCheckpointPath); reconTaskController.updateOMMetadataManager(mockOMMetadataManager); + reconTaskController.stop(); // Test successful queueing - the checkpoint creation should work with proper mocks ReconTaskController.ReInitializationResult result = reconTaskController.queueReInitializationEvent( @@ -377,6 +396,7 @@ public void testQueueReInitializationEventCheckpointFailure() throws Exception { when(mockCheckpoint.getCheckpointLocation()).thenReturn(mockCheckpointPath); reconTaskController.updateOMMetadataManager(mockOMMetadataManager); + reconTaskController.stop(); // Create a spy of the controller to mock checkpoint creation failure ReconTaskControllerImpl controllerSpy = spy((ReconTaskControllerImpl) reconTaskController); @@ -471,6 +491,7 @@ public void testUpdateOMMetadataManager() throws Exception { // Update with first manager reconTaskController.updateOMMetadataManager(mockManager1); + reconTaskController.stop(); // Test that the manager was updated correctly by attempting to queue a reinitialization event ReconTaskController.ReInitializationResult result = reconTaskController.queueReInitializationEvent( @@ -495,6 +516,7 @@ public void testCheckpointManagerCleanupOnQueueFailure() throws Exception { when(mockCheckpoint.getCheckpointLocation()).thenReturn(mockCheckpointPath); reconTaskController.updateOMMetadataManager(mockOMMetadataManager); + reconTaskController.stop(); // This test verifies the successful path - in practice, queue failure after clear is very rare // since we clear the buffer before queueing the reinitialization event @@ -524,6 +546,7 @@ public void testNewRetryLogicWithSuccessfulCheckpoint() throws Exception { ReconTaskControllerImpl controllerImpl = (ReconTaskControllerImpl) reconTaskController; controllerImpl.updateOMMetadataManager(mockOMMetadataManager); + controllerImpl.stop(); // Reset any previous retry state controllerImpl.resetRetryCounters(); @@ -545,6 +568,7 @@ public void testNewRetryLogicWithMaxRetriesExceeded() throws Exception { ReconOMMetadataManager mockOMMetadataManager = mock(ReconOMMetadataManager.class); ReconTaskControllerImpl controllerImpl = (ReconTaskControllerImpl) reconTaskController; controllerImpl.updateOMMetadataManager(mockOMMetadataManager); + controllerImpl.stop(); // Reset any previous retry state controllerImpl.resetRetryCounters();