diff --git a/core/src/main/scala/kafka/log/es/AlwaysSuccessClient.java b/core/src/main/scala/kafka/log/es/AlwaysSuccessClient.java index c8aaabe8fd..070f09a660 100644 --- a/core/src/main/scala/kafka/log/es/AlwaysSuccessClient.java +++ b/core/src/main/scala/kafka/log/es/AlwaysSuccessClient.java @@ -28,7 +28,6 @@ import kafka.log.es.api.RecordBatch; import kafka.log.es.api.Stream; import kafka.log.es.api.StreamClient; -import kafka.log.s3.S3StreamClient; import org.apache.kafka.common.errors.es.SlowFetchHintException; import org.apache.kafka.common.utils.ThreadUtils; import org.slf4j.Logger; @@ -69,7 +68,7 @@ public class AlwaysSuccessClient implements Client { ThreadUtils.createThreadFactory("fetch-callback-scheduler-%d", true)); private final ScheduledExecutorService delayFetchScheduler = Executors.newScheduledThreadPool(1, ThreadUtils.createThreadFactory("fetch-delayer-%d", true)); - private final StreamClientImpl streamClient; + private final StreamClient streamClient; private final KVClient kvClient; private final Delayer delayer; /** @@ -185,9 +184,7 @@ public CompletableFuture openStream(long streamId, OpenStreamOptions opt } public void shutdown() { - if (streamClient instanceof S3StreamClient) { - ((S3StreamClient) streamClient).shutdown(); - } + streamClient.shutdown(); } private void openStream0(long streamId, OpenStreamOptions options, CompletableFuture cf) { diff --git a/core/src/main/scala/kafka/log/es/ElasticRedisClient.java b/core/src/main/scala/kafka/log/es/ElasticRedisClient.java index 2619d561a9..7e78d695dc 100644 --- a/core/src/main/scala/kafka/log/es/ElasticRedisClient.java +++ b/core/src/main/scala/kafka/log/es/ElasticRedisClient.java @@ -188,6 +188,11 @@ public CompletableFuture createAndOpenStream(CreateStreamOptions createS public CompletableFuture openStream(long streamId, OpenStreamOptions openStreamOptions) { return CompletableFuture.completedFuture(new StreamImpl(jedis, streamId)); } + + @Override + public void shutdown() { + + } } static class KVClientImpl implements KVClient { diff --git a/core/src/main/scala/kafka/log/es/MemoryClient.java b/core/src/main/scala/kafka/log/es/MemoryClient.java index 117a7fae05..e552e534f1 100644 --- a/core/src/main/scala/kafka/log/es/MemoryClient.java +++ b/core/src/main/scala/kafka/log/es/MemoryClient.java @@ -126,6 +126,11 @@ public CompletableFuture createAndOpenStream(CreateStreamOptions createS public CompletableFuture openStream(long streamId, OpenStreamOptions openStreamOptions) { return CompletableFuture.completedFuture(new StreamImpl(streamId)); } + + @Override + public void shutdown() { + + } } public static class KVClientImpl implements KVClient { diff --git a/core/src/main/scala/kafka/log/es/api/StreamClient.java b/core/src/main/scala/kafka/log/es/api/StreamClient.java index e7c94b34d5..e3e3aa003a 100644 --- a/core/src/main/scala/kafka/log/es/api/StreamClient.java +++ b/core/src/main/scala/kafka/log/es/api/StreamClient.java @@ -39,4 +39,6 @@ public interface StreamClient { * @return {@link Stream}. */ CompletableFuture openStream(long streamId, OpenStreamOptions options); + + void shutdown(); } diff --git a/core/src/main/scala/kafka/log/es/utils/Threads.java b/core/src/main/scala/kafka/log/es/utils/Threads.java index 94cee7d080..eae2e73327 100644 --- a/core/src/main/scala/kafka/log/es/utils/Threads.java +++ b/core/src/main/scala/kafka/log/es/utils/Threads.java @@ -19,14 +19,17 @@ import org.slf4j.Logger; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ThreadFactory; public class Threads { - public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory, Logger logger) { - return new ScheduledThreadPoolExecutor(1, threadFactory) { + public static ScheduledThreadPoolExecutor newSingleThreadScheduledExecutor(ThreadFactory threadFactory, Logger logger) { + return newSingleThreadScheduledExecutor(threadFactory, logger, false); + } + + public static ScheduledThreadPoolExecutor newSingleThreadScheduledExecutor(ThreadFactory threadFactory, Logger logger, boolean removeOnCancelPolicy) { + ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1, threadFactory) { @Override protected void afterExecute(Runnable r, Throwable t) { super.afterExecute(r, t); @@ -35,6 +38,8 @@ protected void afterExecute(Runnable r, Throwable t) { } } }; + executor.setRemoveOnCancelPolicy(removeOnCancelPolicy); + return executor; } } diff --git a/core/src/main/scala/kafka/log/s3/S3Storage.java b/core/src/main/scala/kafka/log/s3/S3Storage.java index 9efde48c18..53dd80d992 100644 --- a/core/src/main/scala/kafka/log/s3/S3Storage.java +++ b/core/src/main/scala/kafka/log/s3/S3Storage.java @@ -62,9 +62,6 @@ public class S3Storage implements Storage { private final ScheduledExecutorService backgroundExecutor = Threads.newSingleThreadScheduledExecutor( ThreadUtils.createThreadFactory("s3-storage-background", true), LOGGER); - private final ScheduledExecutorService streamObjectCompactionExecutor = Threads.newSingleThreadScheduledExecutor( - ThreadUtils.createThreadFactory("stream-object-compaction-background", true), LOGGER); - private final ObjectManager objectManager; private final S3Operator s3Operator; private final S3BlockCache blockCache; @@ -83,7 +80,6 @@ public S3Storage(KafkaConfig config, WriteAheadLog log, ObjectManager objectMana public void close() { mainExecutor.shutdown(); backgroundExecutor.shutdown(); - streamObjectCompactionExecutor.shutdown(); } @Override diff --git a/core/src/main/scala/kafka/log/s3/S3Stream.java b/core/src/main/scala/kafka/log/s3/S3Stream.java index 3378586e22..21184e418f 100644 --- a/core/src/main/scala/kafka/log/s3/S3Stream.java +++ b/core/src/main/scala/kafka/log/s3/S3Stream.java @@ -55,13 +55,10 @@ public class S3Stream implements Stream { private final StreamManager streamManager; private final Status status; private final Function closeHook; - private StreamObjectsCompactionTask streamObjectsCompactionTask; + private final StreamObjectsCompactionTask streamObjectsCompactionTask; - public S3Stream(long streamId, long epoch, long startOffset, long nextOffset, Storage storage, StreamManager streamManager) { - this(streamId, epoch, startOffset, nextOffset, storage, streamManager, x -> null); - } - - public S3Stream(long streamId, long epoch, long startOffset, long nextOffset, Storage storage, StreamManager streamManager, Function closeHook) { + public S3Stream(long streamId, long epoch, long startOffset, long nextOffset, Storage storage, + StreamManager streamManager, StreamObjectsCompactionTask.Builder compactionTaskBuilder, Function closeHook) { this.streamId = streamId; this.epoch = epoch; this.startOffset = startOffset; @@ -71,18 +68,11 @@ public S3Stream(long streamId, long epoch, long startOffset, long nextOffset, St this.status = new Status(); this.storage = storage; this.streamManager = streamManager; + this.streamObjectsCompactionTask = compactionTaskBuilder.withStream(this).build(); this.closeHook = closeHook; } - public void initCompactionTask(StreamObjectsCompactionTask streamObjectsCompactionTask) { - this.streamObjectsCompactionTask = streamObjectsCompactionTask; - } - public void triggerCompactionTask() throws ExecutionException, InterruptedException { - if (streamObjectsCompactionTask == null) { - throw new RuntimeException("stream objects compaction task is null"); - } - streamObjectsCompactionTask.prepare(); streamObjectsCompactionTask.doCompactions().get(); } diff --git a/core/src/main/scala/kafka/log/s3/S3StreamClient.java b/core/src/main/scala/kafka/log/s3/S3StreamClient.java index ab5d1e6865..07a83003a9 100644 --- a/core/src/main/scala/kafka/log/s3/S3StreamClient.java +++ b/core/src/main/scala/kafka/log/s3/S3StreamClient.java @@ -22,7 +22,8 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; -import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import kafka.log.es.api.CreateStreamOptions; import kafka.log.es.api.OpenStreamOptions; @@ -43,8 +44,9 @@ public class S3StreamClient implements StreamClient { private static final Logger LOGGER = LoggerFactory.getLogger(S3StreamClient.class); - private final ScheduledExecutorService streamObjectCompactionExecutor = Threads.newSingleThreadScheduledExecutor( - ThreadUtils.createThreadFactory("stream-object-compaction-background", true), LOGGER); + private final ScheduledThreadPoolExecutor streamObjectCompactionExecutor = Threads.newSingleThreadScheduledExecutor( + ThreadUtils.createThreadFactory("stream-object-compaction-background", true), LOGGER, true); + private ScheduledFuture scheduledCompactionTaskFuture; private final Map openedStreams; private final StreamManager streamManager; @@ -78,7 +80,7 @@ public CompletableFuture openStream(long streamId, OpenStreamOptions ope * Start stream objects compactions. */ private void startStreamObjectsCompactions() { - streamObjectCompactionExecutor.scheduleWithFixedDelay(() -> { + scheduledCompactionTaskFuture = streamObjectCompactionExecutor.scheduleWithFixedDelay(() -> { List operationStreams = new LinkedList<>(openedStreams.values()); operationStreams.forEach(stream -> { if (stream.isClosed()) { @@ -101,25 +103,26 @@ private void startStreamObjectsCompactions() { private CompletableFuture openStream0(long streamId, long epoch) { return streamManager.openStream(streamId, epoch). thenApply(metadata -> { + StreamObjectsCompactionTask.Builder builder = new StreamObjectsCompactionTask.Builder(objectManager, s3Operator) + .withCompactedStreamObjectMaxSize(config.s3StreamObjectCompactionMaxSize()) + .withCompactableStreamObjectLivingTimeInMs(config.s3StreamObjectCompactionLivingTimeThreshold()); S3Stream stream = new S3Stream( metadata.getStreamId(), metadata.getEpoch(), metadata.getStartOffset(), metadata.getNextOffset(), - storage, streamManager, id -> { - openedStreams.remove(id); - return null; - }); - stream.initCompactionTask(generateStreamObjectsCompactionTask(stream)); + storage, streamManager, builder, id -> { + openedStreams.remove(id); + return null; + }); openedStreams.put(streamId, stream); return stream; }); } - private StreamObjectsCompactionTask generateStreamObjectsCompactionTask(S3Stream stream) { - return new StreamObjectsCompactionTask(objectManager, s3Operator, stream, - config.s3StreamObjectCompactionMaxSize(), config.s3StreamObjectCompactionLivingTimeThreshold()); - } - public void shutdown() { + // cancel the submitted task if not started; do not interrupt the task if it is running. + if (scheduledCompactionTaskFuture != null) { + scheduledCompactionTaskFuture.cancel(false); + } streamObjectCompactionExecutor.shutdown(); } } diff --git a/core/src/main/scala/kafka/log/s3/StreamObjectsCompactionTask.java b/core/src/main/scala/kafka/log/s3/StreamObjectsCompactionTask.java index 66d5a31889..5d20f0a96a 100644 --- a/core/src/main/scala/kafka/log/s3/StreamObjectsCompactionTask.java +++ b/core/src/main/scala/kafka/log/s3/StreamObjectsCompactionTask.java @@ -18,7 +18,6 @@ package kafka.log.s3; import java.util.ArrayList; -import java.util.Collections; import java.util.LinkedList; import java.util.List; import java.util.Queue; @@ -136,7 +135,10 @@ public void prepare() { public Queue> prepareCompactGroups(long startSearchingOffset) { long startOffset = Utils.max(startSearchingOffset, stream.startOffset()); List rawFetchedStreamObjects = objectManager - .getStreamObjects(stream.streamId(), startOffset, stream.nextOffset(), Integer.MAX_VALUE); + .getStreamObjects(stream.streamId(), startOffset, stream.nextOffset(), Integer.MAX_VALUE) + .stream() + .sorted() + .collect(Collectors.toList()); this.nextStartSearchingOffset = calculateNextStartSearchingOffset(rawFetchedStreamObjects, startOffset); @@ -144,7 +146,6 @@ public Queue> prepareCompactGroups(long startSearch .stream() .filter(streamObject -> streamObject.objectSize() < compactedStreamObjectMaxSize) .collect(Collectors.toList()); - Collections.sort(streamObjects); return groupContinuousObjects(streamObjects) .stream() @@ -276,4 +277,32 @@ public HaltException(String message) { super(message); } } + + public static class Builder { + private ObjectManager objectManager; + private S3Operator s3Operator; + private S3Stream stream; + private long compactedStreamObjectMaxSize; + private long compactableStreamObjectLivingTimeInMs; + + public Builder(ObjectManager objectManager, S3Operator s3Operator) { + this.objectManager = objectManager; + this.s3Operator = s3Operator; + } + public Builder withStream(S3Stream stream) { + this.stream = stream; + return this; + } + public Builder withCompactedStreamObjectMaxSize(long compactedStreamObjectMaxSize) { + this.compactedStreamObjectMaxSize = compactedStreamObjectMaxSize; + return this; + } + public Builder withCompactableStreamObjectLivingTimeInMs(long compactableStreamObjectLivingTimeInMs) { + this.compactableStreamObjectLivingTimeInMs = compactableStreamObjectLivingTimeInMs; + return this; + } + public StreamObjectsCompactionTask build() { + return new StreamObjectsCompactionTask(objectManager, s3Operator, stream, compactedStreamObjectMaxSize, compactableStreamObjectLivingTimeInMs); + } + } } diff --git a/core/src/test/java/kafka/log/es/AlwaysSuccessClientTest.java b/core/src/test/java/kafka/log/es/AlwaysSuccessClientTest.java index e31f7ec171..d731b409b7 100644 --- a/core/src/test/java/kafka/log/es/AlwaysSuccessClientTest.java +++ b/core/src/test/java/kafka/log/es/AlwaysSuccessClientTest.java @@ -321,6 +321,11 @@ public CompletableFuture openStream(long streamId, OpenStreamOptions ope } return CompletableFuture.completedFuture(new TestStreamImpl(streamId, delayMillis, exceptionHint)); } + + @Override + public void shutdown() { + + } } static class TestStreamImpl implements Stream { diff --git a/core/src/test/java/kafka/log/s3/S3StreamTest.java b/core/src/test/java/kafka/log/s3/S3StreamTest.java index 8388c6a6d8..15f447a024 100644 --- a/core/src/test/java/kafka/log/s3/S3StreamTest.java +++ b/core/src/test/java/kafka/log/s3/S3StreamTest.java @@ -47,7 +47,8 @@ public class S3StreamTest { public void setup() { storage = mock(Storage.class); streamManager = mock(StreamManager.class); - stream = new S3Stream(233, 1, 100, 233, storage, streamManager); + StreamObjectsCompactionTask.Builder builder = new StreamObjectsCompactionTask.Builder(null, null); + stream = new S3Stream(233, 1, 100, 233, storage, streamManager, builder, v -> null); } @Test diff --git a/core/src/test/java/kafka/log/s3/StreamObjectCopyerTest.java b/core/src/test/java/kafka/log/s3/StreamObjectCopyerTest.java index 12f4317dae..20d0f0984d 100644 --- a/core/src/test/java/kafka/log/s3/StreamObjectCopyerTest.java +++ b/core/src/test/java/kafka/log/s3/StreamObjectCopyerTest.java @@ -36,7 +36,7 @@ @Tag("S3Unit") public class StreamObjectCopyerTest { @Test - public void testCompact() throws ExecutionException, InterruptedException { + public void testCopy() throws ExecutionException, InterruptedException { long targetObjectId = 10; long streamId = 233; diff --git a/core/src/test/java/kafka/log/s3/StreamObjectsCompactionTaskTest.java b/core/src/test/java/kafka/log/s3/StreamObjectsCompactionTaskTest.java index 36d63ff715..e54010eb94 100644 --- a/core/src/test/java/kafka/log/s3/StreamObjectsCompactionTaskTest.java +++ b/core/src/test/java/kafka/log/s3/StreamObjectsCompactionTaskTest.java @@ -17,9 +17,21 @@ package kafka.log.s3; +import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.Queue; +import java.util.Stack; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicLong; +import kafka.log.s3.model.StreamRecordBatch; +import kafka.log.s3.objects.CommitStreamObjectRequest; +import kafka.log.s3.objects.CommitWALObjectRequest; +import kafka.log.s3.objects.CommitWALObjectResponse; import kafka.log.s3.objects.ObjectManager; +import kafka.log.s3.objects.StreamObject; +import kafka.log.s3.operator.MemoryS3Operator; import kafka.log.s3.operator.S3Operator; import org.apache.kafka.metadata.stream.S3StreamObject; import org.apache.kafka.metadata.stream.S3StreamObjectMetadata; @@ -28,7 +40,15 @@ import org.junit.jupiter.api.Test; import org.mockito.Mockito; +import static kafka.log.s3.TestUtils.random; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; @Tag("S3Unit") class StreamObjectsCompactionTaskTest { @@ -39,20 +59,151 @@ class StreamObjectsCompactionTaskTest { @BeforeEach void setUp() { objectManager = Mockito.mock(ObjectManager.class); - s3Operator = Mockito.mock(S3Operator.class); + s3Operator = new MemoryS3Operator(); stream = Mockito.mock(S3Stream.class); - Mockito.when(stream.streamId()).thenReturn(1L); - Mockito.when(stream.startOffset()).thenReturn(5L); - Mockito.when(stream.nextOffset()).thenReturn(100L); + when(stream.streamId()).thenReturn(1L); + when(stream.startOffset()).thenReturn(5L); + when(stream.nextOffset()).thenReturn(100L); } @Test - void prepareCompactGroups() { + void testTriggerTask() throws ExecutionException, InterruptedException { + // Prepare 4 stream objects. They should be compacted into 2 new stream objects. + List> objectsDetails = List.of( + List.of(40L, 50L, 1000L), + List.of(50L, 60L, 1000L), + List.of(65L, 70L, 1000L), + List.of(70L, 80L, 1000L) + ); + List metadataList = prepareRawStreamObjects(10, stream.streamId(), objectsDetails); + + // two stream object groups should be handled + when(objectManager.getStreamObjects(anyLong(), anyLong(), anyLong(), anyInt())) + .thenReturn(metadataList); + StreamObjectsCompactionTask task = new StreamObjectsCompactionTask(objectManager, s3Operator, stream, Long.MAX_VALUE, 0); + + AtomicLong objectIdAlloc = new AtomicLong(100); + List committedRequests = new ArrayList<>(); + when(objectManager.prepareObject(anyInt(), anyLong())).thenAnswer(invocation -> CompletableFuture.completedFuture(objectIdAlloc.getAndIncrement())); + when(objectManager.commitStreamObject(any(CommitStreamObjectRequest.class))).thenAnswer(invocation -> { + committedRequests.add(invocation.getArgument(0)); + return CompletableFuture.completedFuture(null); + }); + + // trigger a stream object compaction task + task.prepare(); + task.doCompactions(); + + // 40L > stream.startOffset, expecting no changes to startSearchingOffset + assertEquals(stream.startOffset(), task.getNextStartSearchingOffset()); + + verify(objectManager, Mockito.times(2)).commitStreamObject(any(CommitStreamObjectRequest.class)); + + assertEquals(100, committedRequests.get(0).getObjectId()); + assertEquals(40, committedRequests.get(0).getStartOffset()); + assertEquals(60, committedRequests.get(0).getEndOffset()); + assertEquals(List.of(11L, 13L), committedRequests.get(0).getSourceObjectIds()); + assertEquals(101, committedRequests.get(1).getObjectId()); + assertEquals(65, committedRequests.get(1).getStartOffset()); + assertEquals(80, committedRequests.get(1).getEndOffset()); + assertEquals(List.of(15L, 17L), committedRequests.get(1).getSourceObjectIds()); + } + + @Test + void testTriggerTaskFailure() throws InterruptedException { + // 2 compaction groups will be handled. + when(objectManager.getStreamObjects(anyLong(), anyLong(), anyLong(), anyInt())) + .thenReturn(List.of( + new S3StreamObjectMetadata(new S3StreamObject(1, 150, 1, 5, 10), 0), + new S3StreamObjectMetadata(new S3StreamObject(2, 20, 1, 10, 20), 0), + new S3StreamObjectMetadata(new S3StreamObject(3, 20, 1, 40, 50), 0), + new S3StreamObjectMetadata(new S3StreamObject(4, 20, 1, 50, 60), 0), + new S3StreamObjectMetadata(new S3StreamObject(5, 20, 1, 65, 70), 0), + new S3StreamObjectMetadata(new S3StreamObject(6, 20, 1, 70, 80), 0) + )); + when(objectManager.prepareObject(anyInt(), anyLong())).thenReturn(CompletableFuture.failedFuture(new RuntimeException("halt compaction task"))); + + // The first group's compaction failed in prepareObject phase, the second group should not be handled. + StreamObjectsCompactionTask task = new StreamObjectsCompactionTask(objectManager, s3Operator, stream, 100, 0); + task.prepare(); + try { + task.doCompactions().get(); + } catch (ExecutionException e) { + assertTrue(e.getCause() instanceof RuntimeException, "should throw RuntimeException"); + } + verify(objectManager, Mockito.times(1)).prepareObject(anyInt(), anyLong()); + verify(objectManager, Mockito.times(0)).commitStreamObject(any(CommitStreamObjectRequest.class)); + + // The first group's compaction failed due to stream's closure, the second group should not be handled. + when(stream.isClosed()).thenReturn(true); + task.prepare(); + try { + task.doCompactions().get(); + } catch (ExecutionException e) { + assertTrue(e.getCause() instanceof StreamObjectsCompactionTask.HaltException, "should throw HaltException"); + } + verify(objectManager, Mockito.times(1)).prepareObject(anyInt(), anyLong()); + verify(objectManager, Mockito.times(0)).commitStreamObject(any(CommitStreamObjectRequest.class)); + + } + + /** + * Prepare raw stream objects for compaction test + * + * @param startObjectId start object id + * @param streamId stream id + * @param objectsDetails list of [startOffset, endOffset, recordsSize]. Each item in the list will be used to generate a stream object. + * @return list of stream object metadata + * @throws ExecutionException + * @throws InterruptedException + */ + List prepareRawStreamObjects(long startObjectId, long streamId, + List> objectsDetails) throws ExecutionException, InterruptedException { + AtomicLong objectIdAlloc = new AtomicLong(startObjectId); + Stack commitWALObjectRequests = new Stack<>(); + doAnswer(invocation -> CompletableFuture.completedFuture(objectIdAlloc.getAndIncrement())).when(objectManager).prepareObject(anyInt(), anyLong()); + when(objectManager.commitWALObject(any())).thenAnswer(invocation -> { + commitWALObjectRequests.push(invocation.getArgument(0)); + return CompletableFuture.completedFuture(new CommitWALObjectResponse()); + }); + + List metadataList = new ArrayList<>(); + + for (int i = 0; i < objectsDetails.size(); i++) { + List objectsDetail = objectsDetails.get(i); + long startOffset = objectsDetail.get(0); + long endOffset = objectsDetail.get(1); + int recordsSize = Math.toIntExact(objectsDetail.get(2)); + + Map> map = Map.of(streamId, List.of(new StreamRecordBatch(streamId, 0, startOffset, Math.toIntExact(endOffset - startOffset), random(recordsSize)))); + WALObjectUploadTask walObjectUploadTask = new WALObjectUploadTask(map, objectManager, s3Operator, 16 * 1024 * 1024, 16 * 1024 * 1024, recordsSize - 1); + + walObjectUploadTask.prepare().get(); + walObjectUploadTask.upload().get(); + walObjectUploadTask.commit().get(); + + CommitWALObjectRequest request = commitWALObjectRequests.pop(); + assertEquals(startObjectId + i * 2L, request.getObjectId()); + + assertEquals(1, request.getStreamObjects().size()); + StreamObject streamObject = request.getStreamObjects().get(0); + assertEquals(streamId, streamObject.getStreamId()); + assertEquals(startObjectId + i * 2L + 1, streamObject.getObjectId()); + assertEquals(startOffset, streamObject.getStartOffset()); + assertEquals(endOffset, streamObject.getEndOffset()); + + metadataList.add(new S3StreamObjectMetadata(new S3StreamObject(streamObject.getObjectId(), streamObject.getObjectSize(), streamObject.getStreamId(), streamObject.getStartOffset(), streamObject.getEndOffset()), System.currentTimeMillis())); + } + return metadataList; + } + + @Test + void testPrepareCompactGroups() { // check if we can filter groups without limit of timestamp StreamObjectsCompactionTask task1 = new StreamObjectsCompactionTask(objectManager, s3Operator, stream, 100, 0); long currentTimestamp = System.currentTimeMillis(); - Mockito.when(objectManager.getStreamObjects(Mockito.anyLong(), Mockito.anyLong(), Mockito.anyLong(), Mockito.anyInt())) + when(objectManager.getStreamObjects(anyLong(), anyLong(), anyLong(), anyInt())) .thenReturn(List.of( new S3StreamObjectMetadata(new S3StreamObject(1, 150, 1, 5, 10), currentTimestamp), new S3StreamObjectMetadata(new S3StreamObject(2, 20, 1, 10, 20), currentTimestamp), @@ -74,7 +225,7 @@ void prepareCompactGroups() { StreamObjectsCompactionTask task2 = new StreamObjectsCompactionTask(objectManager, s3Operator, stream, 100, 10000); currentTimestamp = System.currentTimeMillis(); - Mockito.when(objectManager.getStreamObjects(Mockito.anyLong(), Mockito.anyLong(), Mockito.anyLong(), Mockito.anyInt())) + when(objectManager.getStreamObjects(anyLong(), anyLong(), anyLong(), anyInt())) .thenReturn(List.of( new S3StreamObjectMetadata(new S3StreamObject(1, 60, 1, 5, 10), currentTimestamp - 20000), new S3StreamObjectMetadata(new S3StreamObject(2, 20, 1, 10, 40), currentTimestamp), @@ -83,7 +234,8 @@ void prepareCompactGroups() { new S3StreamObjectMetadata(new S3StreamObject(5, 20, 1, 60, 70), currentTimestamp - 30000), new S3StreamObjectMetadata(new S3StreamObject(6, 20, 1, 70, 80), currentTimestamp - 30000), new S3StreamObjectMetadata(new S3StreamObject(7, 20, 1, 80, 90), currentTimestamp - 30000), - new S3StreamObjectMetadata(new S3StreamObject(8, 20, 1, 90, 99), currentTimestamp) + new S3StreamObjectMetadata(new S3StreamObject(8, 80, 1, 90, 95), currentTimestamp - 30000), + new S3StreamObjectMetadata(new S3StreamObject(9, 20, 1, 95, 99), currentTimestamp) )); compactGroups = task2.prepareCompactGroups(0); assertEquals(1, compactGroups.size());