From 0a6748ee9f397ee61922b324f5c31e4eb35b6bcc Mon Sep 17 00:00:00 2001 From: Russole <850905junior@gmail.com> Date: Thu, 4 Jun 2026 21:51:56 +0800 Subject: [PATCH 1/2] HDDS-15449. Avoid leaked event-processing thread and async work outliving tests in TestReconTaskControllerImpl --- .../tasks/TestReconTaskControllerImpl.java | 56 +++++++++++++------ 1 file changed, 40 insertions(+), 16 deletions(-) diff --git a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestReconTaskControllerImpl.java b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestReconTaskControllerImpl.java index c2636701bf3..341e6e3e527 100644 --- a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestReconTaskControllerImpl.java +++ b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestReconTaskControllerImpl.java @@ -58,6 +58,7 @@ import org.apache.ozone.recon.schema.generated.tables.daos.ReconTaskStatusDao; import org.apache.ozone.recon.schema.generated.tables.pojos.ReconTaskStatus; import org.apache.ozone.test.GenericTestUtils; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -96,6 +97,13 @@ public void setUp() throws IOException { reconTaskController.start(); } + @AfterEach + public void tearDown() { + if (reconTaskController != null) { + reconTaskController.stop(); + } + } + @Test public void testStopCompletesPromptly() { // stop() must not block on the graceful shutdown timeout. The event @@ -122,15 +130,10 @@ public void testRegisterTask() { @Test public void testConsumeOMEvents() throws Exception { - // Use CountDownLatch to wait for async processing - CountDownLatch taskCompletionLatch = new CountDownLatch(1); - ReconOmTask reconOmTaskMock = getMockTask("MockTask"); when(reconOmTaskMock.process(any(OMUpdateEventBatch.class), anyMap())) - .thenAnswer(invocation -> { - taskCompletionLatch.countDown(); // Signal task completion - return new ReconOmTask.TaskResult.Builder().setTaskName("MockTask").setTaskSuccess(true).build(); - }); + .thenReturn(new ReconOmTask.TaskResult.Builder() + .setTaskName("MockTask").setTaskSuccess(true).build()); reconTaskController.registerTask(reconOmTaskMock); OMUpdateEventBatch omUpdateEventBatchMock = mock(OMUpdateEventBatch.class); @@ -145,10 +148,17 @@ public void testConsumeOMEvents() throws Exception { omUpdateEventBatchMock, mock(OMMetadataManager.class)); - // Wait for async processing to complete using latch - boolean completed = taskCompletionLatch.await(10, TimeUnit.SECONDS); - assertThat(completed).isTrue(); - + GenericTestUtils.waitFor(() -> { + try { + ReconTaskStatus status = reconTaskStatusDao.findById("MockTask"); + return status != null + && status.getLastTaskRunStatus() == 0 + && status.getLastUpdatedSeqNumber() == 100L; + } catch (Exception e) { + return false; + } + }, 100, 5000); + verify(reconOmTaskMock, times(1)) .process(any(), anyMap()); long endTime = System.currentTimeMillis(); @@ -236,9 +246,17 @@ public void testFailedTaskRetryLogic() throws Exception { reconTaskController.consumeOMEvents(omUpdateEventBatchMock, mock(OMMetadataManager.class)); - // Wait for async processing to complete - Thread.sleep(3000); // Increase timeout for retry logic - + GenericTestUtils.waitFor(() -> { + try { + ReconTaskStatus status = reconTaskStatusDao.findById(taskName); + return status != null + && status.getLastTaskRunStatus() == 0 + && status.getLastUpdatedSeqNumber() == 100L; + } catch (Exception e) { + return false; + } + }, 100, 5000); + assertThat(reconTaskController.getRegisteredTasks()).isNotEmpty(); assertEquals(dummyReconDBTask, reconTaskController.getRegisteredTasks() .get(dummyReconDBTask.getTaskName())); @@ -350,6 +368,7 @@ public void testQueueReInitializationEventSuccess() throws Exception { when(mockCheckpoint.getCheckpointLocation()).thenReturn(mockCheckpointPath); reconTaskController.updateOMMetadataManager(mockOMMetadataManager); + reconTaskController.stop(); // Test successful queueing - the checkpoint creation should work with proper mocks ReconTaskController.ReInitializationResult result = reconTaskController.queueReInitializationEvent( @@ -377,6 +396,7 @@ public void testQueueReInitializationEventCheckpointFailure() throws Exception { when(mockCheckpoint.getCheckpointLocation()).thenReturn(mockCheckpointPath); reconTaskController.updateOMMetadataManager(mockOMMetadataManager); + reconTaskController.stop(); // Create a spy of the controller to mock checkpoint creation failure ReconTaskControllerImpl controllerSpy = spy((ReconTaskControllerImpl) reconTaskController); @@ -471,6 +491,7 @@ public void testUpdateOMMetadataManager() throws Exception { // Update with first manager reconTaskController.updateOMMetadataManager(mockManager1); + reconTaskController.stop(); // Test that the manager was updated correctly by attempting to queue a reinitialization event ReconTaskController.ReInitializationResult result = reconTaskController.queueReInitializationEvent( @@ -495,6 +516,7 @@ public void testCheckpointManagerCleanupOnQueueFailure() throws Exception { when(mockCheckpoint.getCheckpointLocation()).thenReturn(mockCheckpointPath); reconTaskController.updateOMMetadataManager(mockOMMetadataManager); + reconTaskController.stop(); // This test verifies the successful path - in practice, queue failure after clear is very rare // since we clear the buffer before queueing the reinitialization event @@ -524,6 +546,7 @@ public void testNewRetryLogicWithSuccessfulCheckpoint() throws Exception { ReconTaskControllerImpl controllerImpl = (ReconTaskControllerImpl) reconTaskController; controllerImpl.updateOMMetadataManager(mockOMMetadataManager); + controllerImpl.stop(); // Reset any previous retry state controllerImpl.resetRetryCounters(); @@ -545,6 +568,7 @@ public void testNewRetryLogicWithMaxRetriesExceeded() throws Exception { ReconOMMetadataManager mockOMMetadataManager = mock(ReconOMMetadataManager.class); ReconTaskControllerImpl controllerImpl = (ReconTaskControllerImpl) reconTaskController; controllerImpl.updateOMMetadataManager(mockOMMetadataManager); + controllerImpl.stop(); // Reset any previous retry state controllerImpl.resetRetryCounters(); @@ -561,7 +585,7 @@ public void testNewRetryLogicWithMaxRetriesExceeded() throws Exception { // Iterations 1-6: should return RETRY_LATER and increment retry count for (int i = 1; i <= 6; i++) { if (i > 1) { - Thread.sleep(2100); // Wait for retry delay + Thread.sleep(2500); // Wait for retry delay } result = controllerSpy.queueReInitializationEvent( ReconTaskReInitializationEvent.ReInitializationReason.BUFFER_OVERFLOW); @@ -572,7 +596,7 @@ public void testNewRetryLogicWithMaxRetriesExceeded() throws Exception { // Iteration 7: should return MAX_RETRIES_EXCEEDED (eventProcessRetryCount is now 6, // which >= MAX_EVENT_PROCESS_RETRIES) - Thread.sleep(2100); // Wait for retry delay + Thread.sleep(2500); // Wait for retry delay result = controllerSpy.queueReInitializationEvent( ReconTaskReInitializationEvent.ReInitializationReason.BUFFER_OVERFLOW); assertEquals(ReconTaskController.ReInitializationResult.MAX_RETRIES_EXCEEDED, result, From 61f1a26496c7037c42e9eab42b9da63dc60a8028 Mon Sep 17 00:00:00 2001 From: Russole <850905junior@gmail.com> Date: Sat, 6 Jun 2026 16:51:56 +0800 Subject: [PATCH 2/2] restore Thread.sleep() time --- .../hadoop/ozone/recon/tasks/TestReconTaskControllerImpl.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestReconTaskControllerImpl.java b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestReconTaskControllerImpl.java index 341e6e3e527..1668a1e9fb3 100644 --- a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestReconTaskControllerImpl.java +++ b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestReconTaskControllerImpl.java @@ -585,7 +585,7 @@ public void testNewRetryLogicWithMaxRetriesExceeded() throws Exception { // Iterations 1-6: should return RETRY_LATER and increment retry count for (int i = 1; i <= 6; i++) { if (i > 1) { - Thread.sleep(2500); // Wait for retry delay + Thread.sleep(2100); // Wait for retry delay } result = controllerSpy.queueReInitializationEvent( ReconTaskReInitializationEvent.ReInitializationReason.BUFFER_OVERFLOW); @@ -596,7 +596,7 @@ public void testNewRetryLogicWithMaxRetriesExceeded() throws Exception { // Iteration 7: should return MAX_RETRIES_EXCEEDED (eventProcessRetryCount is now 6, // which >= MAX_EVENT_PROCESS_RETRIES) - Thread.sleep(2500); // Wait for retry delay + Thread.sleep(2100); // Wait for retry delay result = controllerSpy.queueReInitializationEvent( ReconTaskReInitializationEvent.ReInitializationReason.BUFFER_OVERFLOW); assertEquals(ReconTaskController.ReInitializationResult.MAX_RETRIES_EXCEEDED, result,