Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import org.apache.ozone.recon.schema.generated.tables.daos.ReconTaskStatusDao;
import org.apache.ozone.recon.schema.generated.tables.pojos.ReconTaskStatus;
import org.apache.ozone.test.GenericTestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

Expand Down Expand Up @@ -96,6 +97,13 @@ public void setUp() throws IOException {
reconTaskController.start();
}

@AfterEach
public void tearDown() {
if (reconTaskController != null) {
reconTaskController.stop();
}
}

@Test
public void testStopCompletesPromptly() {
// stop() must not block on the graceful shutdown timeout. The event
Expand All @@ -122,15 +130,10 @@ public void testRegisterTask() {

@Test
public void testConsumeOMEvents() throws Exception {
// Use CountDownLatch to wait for async processing
CountDownLatch taskCompletionLatch = new CountDownLatch(1);

ReconOmTask reconOmTaskMock = getMockTask("MockTask");
when(reconOmTaskMock.process(any(OMUpdateEventBatch.class), anyMap()))
.thenAnswer(invocation -> {
taskCompletionLatch.countDown(); // Signal task completion
return new ReconOmTask.TaskResult.Builder().setTaskName("MockTask").setTaskSuccess(true).build();
});
.thenReturn(new ReconOmTask.TaskResult.Builder()
.setTaskName("MockTask").setTaskSuccess(true).build());
reconTaskController.registerTask(reconOmTaskMock);

OMUpdateEventBatch omUpdateEventBatchMock = mock(OMUpdateEventBatch.class);
Expand All @@ -145,10 +148,17 @@ public void testConsumeOMEvents() throws Exception {
omUpdateEventBatchMock,
mock(OMMetadataManager.class));

// Wait for async processing to complete using latch
boolean completed = taskCompletionLatch.await(10, TimeUnit.SECONDS);
assertThat(completed).isTrue();

GenericTestUtils.waitFor(() -> {
try {
ReconTaskStatus status = reconTaskStatusDao.findById("MockTask");
return status != null
&& status.getLastTaskRunStatus() == 0
&& status.getLastUpdatedSeqNumber() == 100L;
} catch (Exception e) {
return false;
}
}, 100, 5000);

verify(reconOmTaskMock, times(1))
.process(any(), anyMap());
long endTime = System.currentTimeMillis();
Expand Down Expand Up @@ -236,9 +246,17 @@ public void testFailedTaskRetryLogic() throws Exception {
reconTaskController.consumeOMEvents(omUpdateEventBatchMock,
mock(OMMetadataManager.class));

// Wait for async processing to complete
Thread.sleep(3000); // Increase timeout for retry logic

GenericTestUtils.waitFor(() -> {
try {
ReconTaskStatus status = reconTaskStatusDao.findById(taskName);
return status != null
&& status.getLastTaskRunStatus() == 0
&& status.getLastUpdatedSeqNumber() == 100L;
} catch (Exception e) {
return false;
}
}, 100, 5000);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: This waitFor block is duplicated verbatim in testConsumeOMEvents and testFailedTaskRetryLogic. We can consider extracting a small helper.

Not a blocker, happy to merge as is.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pointing this out. I agree a helper could clean this up, but I’ll keep it as-is for this patch since the duplication is small and the current form keeps the wait condition explicit.


assertThat(reconTaskController.getRegisteredTasks()).isNotEmpty();
assertEquals(dummyReconDBTask, reconTaskController.getRegisteredTasks()
.get(dummyReconDBTask.getTaskName()));
Expand Down Expand Up @@ -350,6 +368,7 @@ public void testQueueReInitializationEventSuccess() throws Exception {
when(mockCheckpoint.getCheckpointLocation()).thenReturn(mockCheckpointPath);

reconTaskController.updateOMMetadataManager(mockOMMetadataManager);
reconTaskController.stop();

// Test successful queueing - the checkpoint creation should work with proper mocks
ReconTaskController.ReInitializationResult result = reconTaskController.queueReInitializationEvent(
Expand Down Expand Up @@ -377,6 +396,7 @@ public void testQueueReInitializationEventCheckpointFailure() throws Exception {
when(mockCheckpoint.getCheckpointLocation()).thenReturn(mockCheckpointPath);

reconTaskController.updateOMMetadataManager(mockOMMetadataManager);
reconTaskController.stop();

// Create a spy of the controller to mock checkpoint creation failure
ReconTaskControllerImpl controllerSpy = spy((ReconTaskControllerImpl) reconTaskController);
Expand Down Expand Up @@ -471,6 +491,7 @@ public void testUpdateOMMetadataManager() throws Exception {

// Update with first manager
reconTaskController.updateOMMetadataManager(mockManager1);
reconTaskController.stop();

// Test that the manager was updated correctly by attempting to queue a reinitialization event
ReconTaskController.ReInitializationResult result = reconTaskController.queueReInitializationEvent(
Expand All @@ -495,6 +516,7 @@ public void testCheckpointManagerCleanupOnQueueFailure() throws Exception {
when(mockCheckpoint.getCheckpointLocation()).thenReturn(mockCheckpointPath);

reconTaskController.updateOMMetadataManager(mockOMMetadataManager);
reconTaskController.stop();

// This test verifies the successful path - in practice, queue failure after clear is very rare
// since we clear the buffer before queueing the reinitialization event
Expand Down Expand Up @@ -524,6 +546,7 @@ public void testNewRetryLogicWithSuccessfulCheckpoint() throws Exception {

ReconTaskControllerImpl controllerImpl = (ReconTaskControllerImpl) reconTaskController;
controllerImpl.updateOMMetadataManager(mockOMMetadataManager);
controllerImpl.stop();

// Reset any previous retry state
controllerImpl.resetRetryCounters();
Expand All @@ -545,6 +568,7 @@ public void testNewRetryLogicWithMaxRetriesExceeded() throws Exception {
ReconOMMetadataManager mockOMMetadataManager = mock(ReconOMMetadataManager.class);
ReconTaskControllerImpl controllerImpl = (ReconTaskControllerImpl) reconTaskController;
controllerImpl.updateOMMetadataManager(mockOMMetadataManager);
controllerImpl.stop();

// Reset any previous retry state
controllerImpl.resetRetryCounters();
Expand Down