diff --git a/core/src/test/java/kafka/log/s3/S3StorageTest.java b/core/src/test/java/kafka/log/s3/S3StorageTest.java index 36bb08822f..684bb9f274 100644 --- a/core/src/test/java/kafka/log/s3/S3StorageTest.java +++ b/core/src/test/java/kafka/log/s3/S3StorageTest.java @@ -43,10 +43,10 @@ import java.util.Collections; import java.util.List; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; import static kafka.log.s3.TestUtils.random; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -141,12 +141,13 @@ public void testWALCallbackSequencer() { @Test public void testUploadWALObject_sequence() throws ExecutionException, InterruptedException, TimeoutException { - List> objectIdCfList = new CopyOnWriteArrayList<>(); - doAnswer(invocation -> { - CompletableFuture objectIdCf = new CompletableFuture<>(); - objectIdCfList.add(objectIdCf); - return objectIdCf; - }).when(objectManager).prepareObject(anyInt(), anyLong()); + List> objectIdCfList = List.of(new CompletableFuture<>(), new CompletableFuture<>()); + AtomicInteger objectCfIndex = new AtomicInteger(); + doAnswer(invocation -> objectIdCfList.get(objectCfIndex.getAndIncrement())).when(objectManager).prepareObject(anyInt(), anyLong()); + + List> commitCfList = List.of(new CompletableFuture<>(), new CompletableFuture<>()); + AtomicInteger commitCfIndex = new AtomicInteger(); + doAnswer(invocation -> commitCfList.get(commitCfIndex.getAndIncrement())).when(objectManager).commitWALObject(any()); LogCache.LogCacheBlock logCacheBlock1 = new LogCache.LogCacheBlock(1024); logCacheBlock1.put(newRecord(233L, 10L)); @@ -163,13 +164,6 @@ public void testUploadWALObject_sequence() throws ExecutionException, Interrupte // sequence get objectId verify(objectManager, timeout(1000).times(1)).prepareObject(anyInt(), anyLong()); - List> commitCfList = new CopyOnWriteArrayList<>(); - doAnswer(invocation -> { - CompletableFuture cf = new CompletableFuture<>(); - commitCfList.add(cf); - return cf; - }).when(objectManager).commitWALObject(any()); - objectIdCfList.get(0).complete(1L); // trigger next upload prepare objectId verify(objectManager, timeout(1000).times(2)).prepareObject(anyInt(), anyLong());