diff --git a/core/src/main/scala/kafka/log/s3/S3Storage.java b/core/src/main/scala/kafka/log/s3/S3Storage.java index 2e2815b04c..d0e885b9c3 100644 --- a/core/src/main/scala/kafka/log/s3/S3Storage.java +++ b/core/src/main/scala/kafka/log/s3/S3Storage.java @@ -42,8 +42,8 @@ public class S3Storage implements Storage { private final BlockingQueue waitingLogConfirmedRequests; private final WriteAheadLog log; private final LogCache logCache; - private final AtomicLong logConfirmPosition = new AtomicLong(); - private final AtomicLong processedLogConfirmPosition = new AtomicLong(); + private final AtomicLong logConfirmOffset = new AtomicLong(); + private final AtomicLong processedLogConfirmOffset = new AtomicLong(); private final ScheduledExecutorService mainExecutor = Executors.newSingleThreadScheduledExecutor( ThreadUtils.createThreadFactory("s3-storage-main", false)); private final ScheduledExecutorService backgroundExecutor = Executors.newSingleThreadScheduledExecutor( @@ -73,7 +73,7 @@ public CompletableFuture append(StreamRecordBatch streamRecord) { FlatStreamRecordBatch flatStreamRecordBatch = FlatStreamRecordBatch.from(streamRecord); WriteAheadLog.AppendResult appendResult = log.append(flatStreamRecordBatch.encodedBuf.duplicate()); CompletableFuture cf = new CompletableFuture<>(); - WalWriteRequest writeRequest = new WalWriteRequest(flatStreamRecordBatch, appendResult.endPosition, cf); + WalWriteRequest writeRequest = new WalWriteRequest(flatStreamRecordBatch, appendResult.offset, cf); try { waitingLogConfirmedRequests.put(writeRequest); } catch (InterruptedException e) { @@ -82,7 +82,7 @@ public CompletableFuture append(StreamRecordBatch streamRecord) { appendResult.future.thenAccept(nil -> { // TODO: callback is out of order, we need reorder ack in stream dimension. // TODO: cache end offset update should consider log hollow. - logConfirmPosition.getAndUpdate(operand -> Math.max(operand, appendResult.endPosition)); + logConfirmOffset.getAndUpdate(operand -> Math.max(operand, appendResult.offset)); putToCache(writeRequest); tryCallback(); }); @@ -93,6 +93,7 @@ public CompletableFuture append(StreamRecordBatch streamRecord) { @Override public CompletableFuture read(long streamId, long startOffset, long endOffset, int maxBytes) { + // TODO: thread model to keep data safe. List records = logCache.get(streamId, startOffset, endOffset, maxBytes); if (!records.isEmpty()) { return CompletableFuture.completedFuture(new ReadDataBlock(StreamRecordBatchCodec.decode(records))); @@ -110,29 +111,29 @@ public CompletableFuture read(long streamId, long startOffset, lo } private void tryCallback() { - if (processedLogConfirmPosition.get() == logConfirmPosition.get()) { + if (processedLogConfirmOffset.get() == logConfirmOffset.get()) { return; } mainExecutor.execute(this::tryCallback0); } private void tryCallback0() { - long walConfirmOffset = this.logConfirmPosition.get(); + long walConfirmOffset = this.logConfirmOffset.get(); for (; ; ) { WalWriteRequest request = waitingLogConfirmedRequests.peek(); if (request == null) break; - if (request.position <= walConfirmOffset) { + if (request.offset <= walConfirmOffset) { waitingLogConfirmedRequests.poll(); request.cf.complete(null); } else { break; } } - processedLogConfirmPosition.set(walConfirmOffset); + processedLogConfirmOffset.set(walConfirmOffset); } private void putToCache(WalWriteRequest request) { - if (logCache.put(request.record, request.position)) { + if (logCache.put(request.record, request.offset)) { // cache block is full, trigger WAL object upload. LogCache.LogCacheBlock logCacheBlock = logCache.archiveCurrentBlock(); uploadWALObject(logCacheBlock); @@ -150,7 +151,7 @@ private void uploadWALObject0(LogCache.LogCacheBlock logCacheBlock) { walObjectUploadTask.prepare().get(); walObjectUploadTask.upload().get(); walObjectUploadTask.commit().get(); - log.trim(logCacheBlock.logEndPosition()); + log.trim(logCacheBlock.maxOffset()); freeCache(logCacheBlock.blockId()); } catch (Throwable e) { LOGGER.error("unexpect upload wal object fail", e); diff --git a/core/src/main/scala/kafka/log/s3/WALObjectUploadTask.java b/core/src/main/scala/kafka/log/s3/WALObjectUploadTask.java index d0aa0f53f8..ed237fbb3e 100644 --- a/core/src/main/scala/kafka/log/s3/WALObjectUploadTask.java +++ b/core/src/main/scala/kafka/log/s3/WALObjectUploadTask.java @@ -17,7 +17,7 @@ package kafka.log.s3; -import kafka.log.s3.objects.CommitCompactObjectRequest; +import kafka.log.s3.objects.CommitWALObjectRequest; import kafka.log.s3.objects.ObjectManager; import kafka.log.s3.objects.ObjectStreamRange; import kafka.log.s3.objects.StreamObject; @@ -40,7 +40,7 @@ public class WALObjectUploadTask { private final ObjectManager objectManager; private final S3Operator s3Operator; private final CompletableFuture prepareCf = new CompletableFuture<>(); - private final CompletableFuture uploadCf = new CompletableFuture<>(); + private final CompletableFuture uploadCf = new CompletableFuture<>(); public WALObjectUploadTask(Map> streamRecordsMap, int streamSplitSizeThreshold, ObjectManager objectManager, S3Operator s3Operator) { this.streamRecordsMap = streamRecordsMap; @@ -58,12 +58,11 @@ public CompletableFuture prepare() { return prepareCf; } - public CompletableFuture upload() { + public CompletableFuture upload() { prepareCf.thenAccept(objectId -> { List streamIds = new ArrayList<>(streamRecordsMap.keySet()); Collections.sort(streamIds); - CommitCompactObjectRequest compactRequest = new CommitCompactObjectRequest(); - compactRequest.setCompactedObjectIds(Collections.emptyList()); + CommitWALObjectRequest compactRequest = new CommitWALObjectRequest(); ObjectWriter minorCompactObject = new ObjectWriter(objectId, s3Operator); @@ -86,6 +85,7 @@ public CompletableFuture upload() { } } compactRequest.setObjectId(objectId); + compactRequest.setOrderId(objectId); CompletableFuture minorCompactObjectCf = minorCompactObject.close().thenAccept(nil -> { compactRequest.setObjectSize(minorCompactObject.size()); }); @@ -107,7 +107,7 @@ public CompletableFuture upload() { } public CompletableFuture commit() { - return uploadCf.thenCompose(objectManager::commitMinorCompactObject); + return uploadCf.thenCompose(request -> objectManager.commitWALObject(request).thenApply(resp -> null)); } private CompletableFuture writeStreamObject(List streamRecords) { diff --git a/core/src/main/scala/kafka/log/s3/WalWriteRequest.java b/core/src/main/scala/kafka/log/s3/WalWriteRequest.java index de6aeb581b..fb29497641 100644 --- a/core/src/main/scala/kafka/log/s3/WalWriteRequest.java +++ b/core/src/main/scala/kafka/log/s3/WalWriteRequest.java @@ -22,12 +22,12 @@ public class WalWriteRequest implements Comparable { final FlatStreamRecordBatch record; - final long position; + final long offset; final CompletableFuture cf; - public WalWriteRequest(FlatStreamRecordBatch record, long position, CompletableFuture cf) { + public WalWriteRequest(FlatStreamRecordBatch record, long offset, CompletableFuture cf) { this.record = record; - this.position = position; + this.offset = offset; this.cf = cf; } diff --git a/core/src/main/scala/kafka/log/s3/cache/LogCache.java b/core/src/main/scala/kafka/log/s3/cache/LogCache.java index a8d8612112..fdf381f6f6 100644 --- a/core/src/main/scala/kafka/log/s3/cache/LogCache.java +++ b/core/src/main/scala/kafka/log/s3/cache/LogCache.java @@ -37,8 +37,8 @@ public LogCache(int cacheBlockMaxSize) { this.activeBlock = new LogCacheBlock(cacheBlockMaxSize); } - public boolean put(FlatStreamRecordBatch recordBatch, long endPosition) { - return activeBlock.put(recordBatch, endPosition); + public boolean put(FlatStreamRecordBatch recordBatch, long offset) { + return activeBlock.put(recordBatch, offset); } /** @@ -84,7 +84,7 @@ public static class LogCacheBlock { private final int maxSize; private final Map> map = new HashMap<>(); private int size = 0; - private long logEndPosition; + private long maxOffset; public LogCacheBlock(int maxSize) { this.blockId = BLOCK_ID_ALLOC.getAndIncrement(); @@ -95,12 +95,12 @@ public long blockId() { return blockId; } - public boolean put(FlatStreamRecordBatch recordBatch, long endPosition) { + public boolean put(FlatStreamRecordBatch recordBatch, long offset) { List streamCache = map.computeIfAbsent(recordBatch.streamId, id -> new ArrayList<>()); streamCache.add(recordBatch); int recordSize = recordBatch.encodedBuf.readableBytes(); size += recordSize; - logEndPosition = endPosition; + maxOffset = offset; return size >= maxSize; } @@ -136,8 +136,8 @@ public Map> records() { return map; } - public long logEndPosition() { - return logEndPosition; + public long maxOffset() { + return maxOffset; } } diff --git a/core/src/main/scala/kafka/log/s3/memory/MemoryMetadataManager.java b/core/src/main/scala/kafka/log/s3/memory/MemoryMetadataManager.java index c2787a273e..041a69c06d 100644 --- a/core/src/main/scala/kafka/log/s3/memory/MemoryMetadataManager.java +++ b/core/src/main/scala/kafka/log/s3/memory/MemoryMetadataManager.java @@ -31,11 +31,12 @@ import java.util.concurrent.TimeUnit; import java.util.function.Supplier; import java.util.stream.Collectors; + import kafka.log.s3.model.StreamOffset; import kafka.log.s3.objects.CommitCompactObjectRequest; import kafka.log.s3.objects.CommitStreamObjectRequest; -import kafka.log.s3.objects.CommitWalObjectRequest; -import kafka.log.s3.objects.CommitWalObjectResponse; +import kafka.log.s3.objects.CommitWALObjectRequest; +import kafka.log.s3.objects.CommitWALObjectResponse; import kafka.log.s3.objects.ObjectManager; import kafka.log.s3.objects.ObjectStreamRange; import kafka.log.s3.objects.OpenStreamMetadata; @@ -145,11 +146,9 @@ public CompletableFuture prepareObject(int count, long ttl) { } @Override - public CompletableFuture commitWalObject(CommitWalObjectRequest request) { + public CompletableFuture commitWALObject(CommitWALObjectRequest request) { return this.submitEvent(() -> { - CommitWalObjectResponse resp = new CommitWalObjectResponse(); - List failedStreamIds = new ArrayList<>(); - resp.setFailedStreamIds(failedStreamIds); + CommitWALObjectResponse resp = new CommitWALObjectResponse(); long objectId = request.getObjectId(); long objectSize = request.getObjectSize(); List streamRanges = request.getStreamRanges(); @@ -160,21 +159,15 @@ public CompletableFuture commitWalObject(CommitWalObjec if (object.getS3ObjectState() != S3ObjectState.PREPARED) { throw new RuntimeException("Object " + objectId + " is not in prepared state"); } - // verify the stream - streamRanges.stream().filter(range -> !verifyWalStreamRanges(range)).mapToLong(ObjectStreamRange::getStreamId) - .forEach(failedStreamIds::add); - if (!failedStreamIds.isEmpty()) { - return resp; - } // commit object this.objectsMetadata.put(objectId, new S3Object( - objectId, objectSize, object.getObjectKey(), - object.getPreparedTimeInMs(), object.getExpiredTimeInMs(), System.currentTimeMillis(), -1, - S3ObjectState.COMMITTED) + objectId, objectSize, object.getObjectKey(), + object.getPreparedTimeInMs(), object.getExpiredTimeInMs(), System.currentTimeMillis(), -1, + S3ObjectState.COMMITTED) ); // build metadata MemoryBrokerWALMetadata walMetadata = this.brokerWALMetadata.computeIfAbsent(MOCK_BROKER_ID, - k -> new MemoryBrokerWALMetadata(k)); + k -> new MemoryBrokerWALMetadata(k)); Map> index = new HashMap<>(); streamRanges.stream().forEach(range -> { long streamId = range.getStreamId(); @@ -185,7 +178,15 @@ public CompletableFuture commitWalObject(CommitWalObjec MemoryStreamMetadata streamMetadata = this.streamsMetadata.get(streamId); streamMetadata.endOffset = endOffset; }); - S3WALObject walObject = new S3WALObject(objectId, MOCK_BROKER_ID, index); + request.getStreamObjects().forEach(streamObject -> { + MemoryStreamMetadata streamMetadata = this.streamsMetadata.get(streamObject.getStreamId()); + S3StreamObject s3StreamObject = new S3StreamObject(streamObject.getObjectId(), streamObject.getObjectSize(), + streamObject.getStreamId(), streamObject.getStartOffset(), streamObject.getEndOffset()); + streamMetadata.addStreamObject(s3StreamObject); + streamMetadata.endOffset = Math.max(streamMetadata.endOffset, streamObject.getEndOffset()); + }); + + S3WALObject walObject = new S3WALObject(objectId, MOCK_BROKER_ID, index, request.getOrderId()); walMetadata.walObjects.add(walObject); return resp; }); @@ -209,11 +210,6 @@ private boolean verifyWalStreamRanges(ObjectStreamRange range) { return true; } - @Override - public CompletableFuture commitMinorCompactObject(CommitCompactObjectRequest request) { - return CompletableFuture.completedFuture(null); - } - @Override public CompletableFuture commitMajorCompactObject(CommitCompactObjectRequest request) { return null; @@ -282,7 +278,7 @@ public CompletableFuture createStream() { return this.submitEvent(() -> { long streamId = this.nextAssignedStreamId++; this.streamsMetadata.put(streamId, - new MemoryStreamMetadata(streamId)); + new MemoryStreamMetadata(streamId)); return streamId; }); } @@ -373,9 +369,9 @@ private S3Object prepareObject(long objectId, long ttl) { long preparedTs = System.currentTimeMillis(); String objectKey = ObjectUtils.genKey(0, "todocluster", objectId); return new S3Object( - objectId, -1, objectKey, - preparedTs, preparedTs + ttl, -1, -1, - S3ObjectState.PREPARED); + objectId, -1, objectKey, + preparedTs, preparedTs + ttl, -1, -1, + S3ObjectState.PREPARED); } diff --git a/core/src/main/scala/kafka/log/s3/objects/CommitWalObjectRequest.java b/core/src/main/scala/kafka/log/s3/objects/CommitWALObjectRequest.java similarity index 56% rename from core/src/main/scala/kafka/log/s3/objects/CommitWalObjectRequest.java rename to core/src/main/scala/kafka/log/s3/objects/CommitWALObjectRequest.java index 92f6fab04c..2ac3dcaa95 100644 --- a/core/src/main/scala/kafka/log/s3/objects/CommitWalObjectRequest.java +++ b/core/src/main/scala/kafka/log/s3/objects/CommitWALObjectRequest.java @@ -17,9 +17,11 @@ package kafka.log.s3.objects; +import java.util.Collections; +import java.util.LinkedList; import java.util.List; -public class CommitWalObjectRequest { +public class CommitWALObjectRequest { private long objectId; private long objectSize; /** @@ -27,6 +29,12 @@ public class CommitWalObjectRequest { */ private List streamRanges; + /** + * The stream objects which split from the compacted object. + */ + private List streamObjects; + private long orderId; + public long getObjectId() { return objectId; } @@ -44,10 +52,46 @@ public void setObjectSize(long objectSize) { } public List getStreamRanges() { + if (streamRanges == null) { + return Collections.emptyList(); + } return streamRanges; } public void setStreamRanges(List streamRanges) { this.streamRanges = streamRanges; } + + public void addStreamRange(ObjectStreamRange streamRange) { + if (streamRanges == null) { + streamRanges = new LinkedList<>(); + } + streamRanges.add(streamRange); + } + + public List getStreamObjects() { + if (streamObjects == null) { + return Collections.emptyList(); + } + return streamObjects; + } + + public void setStreamObjects(List streamObjects) { + this.streamObjects = streamObjects; + } + + public void addStreamObject(StreamObject streamObject) { + if (streamObjects == null) { + streamObjects = new LinkedList<>(); + } + streamObjects.add(streamObject); + } + + public long getOrderId() { + return orderId; + } + + public void setOrderId(long orderId) { + this.orderId = orderId; + } } diff --git a/core/src/main/scala/kafka/log/s3/objects/CommitWalObjectResponse.java b/core/src/main/scala/kafka/log/s3/objects/CommitWALObjectResponse.java similarity index 60% rename from core/src/main/scala/kafka/log/s3/objects/CommitWalObjectResponse.java rename to core/src/main/scala/kafka/log/s3/objects/CommitWALObjectResponse.java index 27c2a5f07d..6bd622fbcf 100644 --- a/core/src/main/scala/kafka/log/s3/objects/CommitWalObjectResponse.java +++ b/core/src/main/scala/kafka/log/s3/objects/CommitWALObjectResponse.java @@ -17,24 +17,8 @@ package kafka.log.s3.objects; -import java.util.Collections; -import java.util.List; - /** - * Commit wal loose object response. - * When stream is fenced, the stream id will be added to failedStreamIds. + * Commit WAL object response. */ -public class CommitWalObjectResponse { - private List failedStreamIds; - - public List getFailedStreamIds() { - if (failedStreamIds == null) { - return Collections.emptyList(); - } - return failedStreamIds; - } - - public void setFailedStreamIds(List failedStreamIds) { - this.failedStreamIds = failedStreamIds; - } +public class CommitWALObjectResponse { } diff --git a/core/src/main/scala/kafka/log/s3/objects/ControllerObjectManager.java b/core/src/main/scala/kafka/log/s3/objects/ControllerObjectManager.java index f6b106dd7f..635d759c1c 100644 --- a/core/src/main/scala/kafka/log/s3/objects/ControllerObjectManager.java +++ b/core/src/main/scala/kafka/log/s3/objects/ControllerObjectManager.java @@ -69,12 +69,7 @@ public CompletableFuture prepareObject(int count, long ttl) { } @Override - public CompletableFuture commitWalObject(CommitWalObjectRequest request) { - return null; - } - - @Override - public CompletableFuture commitMinorCompactObject(CommitCompactObjectRequest request) { + public CompletableFuture commitWALObject(CommitWALObjectRequest request) { return null; } diff --git a/core/src/main/scala/kafka/log/s3/objects/ObjectManager.java b/core/src/main/scala/kafka/log/s3/objects/ObjectManager.java index a51caa3d0d..08d86deb84 100644 --- a/core/src/main/scala/kafka/log/s3/objects/ObjectManager.java +++ b/core/src/main/scala/kafka/log/s3/objects/ObjectManager.java @@ -36,19 +36,12 @@ public interface ObjectManager { CompletableFuture prepareObject(int count, long ttl); /** - * Commit wal object. + * Commit WAL object. * - * @param request {@link CommitWalObjectRequest} - * @return {@link CommitWalObjectResponse} + * @param request {@link CommitWALObjectRequest} + * @return {@link CommitWALObjectResponse} */ - CompletableFuture commitWalObject(CommitWalObjectRequest request); - - /** - * Commit minor compact object. Use minor compact object and stream objects to substitute wal object. - * - * @param request {@link CommitCompactObjectRequest} - */ - CompletableFuture commitMinorCompactObject(CommitCompactObjectRequest request); + CompletableFuture commitWALObject(CommitWALObjectRequest request); /** * Commit major compact object. Use major compact object and stream objects to substitute minor compact object. diff --git a/core/src/main/scala/kafka/log/s3/wal/MemoryWriteAheadLog.java b/core/src/main/scala/kafka/log/s3/wal/MemoryWriteAheadLog.java index cd302ed70a..25fa15c1db 100644 --- a/core/src/main/scala/kafka/log/s3/wal/MemoryWriteAheadLog.java +++ b/core/src/main/scala/kafka/log/s3/wal/MemoryWriteAheadLog.java @@ -28,12 +28,12 @@ public class MemoryWriteAheadLog implements WriteAheadLog { private final AtomicLong offsetAlloc = new AtomicLong(); @Override - public long startPosition() { + public long startOffset() { return 0; } @Override - public long endPosition() { + public long endOffset() { return 0; } @@ -45,13 +45,13 @@ public List read() { @Override public AppendResult append(ByteBuf data) { AppendResult appendResult = new AppendResult(); - appendResult.endPosition = offsetAlloc.getAndIncrement(); + appendResult.offset = offsetAlloc.getAndIncrement(); appendResult.future = CompletableFuture.completedFuture(null); return appendResult; } @Override - public void trim(long position) { + public void trim(long offset) { } } diff --git a/core/src/main/scala/kafka/log/s3/wal/WriteAheadLog.java b/core/src/main/scala/kafka/log/s3/wal/WriteAheadLog.java index bf1fa9c261..27e0d4310c 100644 --- a/core/src/main/scala/kafka/log/s3/wal/WriteAheadLog.java +++ b/core/src/main/scala/kafka/log/s3/wal/WriteAheadLog.java @@ -26,16 +26,16 @@ public interface WriteAheadLog { /** - * Get log start position. - * @return start position. + * Get log start offset. + * @return start offset. */ - long startPosition(); + long startOffset(); /** - * Get log end position. - * @return end position. + * Get log end offset. + * @return exclusive end offset. */ - long endPosition(); + long endOffset(); /** * Read data from log. @@ -52,24 +52,24 @@ public interface WriteAheadLog { AppendResult append(ByteBuf data); /** - * Trim log to new start position. + * Trim data <= offset in log. * - * @param position new start position. + * @param offset inclusive trim offset. */ - void trim(long position); + void trim(long offset); class WalRecord { - private long endPosition; - private ByteBuf data; + private final long offset; + private final ByteBuf data; - public WalRecord(long endPosition, ByteBuf data) { - this.endPosition = endPosition; + public WalRecord(long offset, ByteBuf data) { + this.offset = offset; this.data = data; } - public long endPosition() { - return endPosition; + public long offset() { + return offset; } public ByteBuf data() { @@ -78,7 +78,7 @@ public ByteBuf data() { } class AppendResult { - public long endPosition; + public long offset; public CompletableFuture future; } diff --git a/core/src/test/java/kafka/log/s3/S3StorageTest.java b/core/src/test/java/kafka/log/s3/S3StorageTest.java index dd71a70bb0..b6672bd644 100644 --- a/core/src/test/java/kafka/log/s3/S3StorageTest.java +++ b/core/src/test/java/kafka/log/s3/S3StorageTest.java @@ -20,7 +20,7 @@ import kafka.log.s3.cache.DefaultS3BlockCache; import kafka.log.s3.cache.ReadDataBlock; import kafka.log.s3.model.StreamRecordBatch; -import kafka.log.s3.objects.CommitWalObjectResponse; +import kafka.log.s3.objects.CommitWALObjectResponse; import kafka.log.s3.objects.ObjectManager; import kafka.log.s3.operator.MemoryS3Operator; import kafka.log.s3.operator.S3Operator; @@ -54,8 +54,8 @@ public void setup() { @Test public void testAppend() throws Exception { when(objectManager.prepareObject(eq(1), anyLong())).thenReturn(CompletableFuture.completedFuture(16L)); - CommitWalObjectResponse resp = new CommitWalObjectResponse(); - when(objectManager.commitWalObject(any())).thenReturn(CompletableFuture.completedFuture(resp)); + CommitWALObjectResponse resp = new CommitWALObjectResponse(); + when(objectManager.commitWALObject(any())).thenReturn(CompletableFuture.completedFuture(resp)); CompletableFuture cf1 = storage.append(new StreamRecordBatch(233, 1, 10, DefaultRecordBatch.of(1, 100))); CompletableFuture cf2 = storage.append(new StreamRecordBatch(233, 1, 11, DefaultRecordBatch.of(2, 100))); diff --git a/core/src/test/java/kafka/log/s3/WALObjectUploadTaskTest.java b/core/src/test/java/kafka/log/s3/WALObjectUploadTaskTest.java index d21df1b009..cf6a889f66 100644 --- a/core/src/test/java/kafka/log/s3/WALObjectUploadTaskTest.java +++ b/core/src/test/java/kafka/log/s3/WALObjectUploadTaskTest.java @@ -18,7 +18,8 @@ package kafka.log.s3; import kafka.log.s3.model.StreamRecordBatch; -import kafka.log.s3.objects.CommitCompactObjectRequest; +import kafka.log.s3.objects.CommitWALObjectRequest; +import kafka.log.s3.objects.CommitWALObjectResponse; import kafka.log.s3.objects.ObjectManager; import kafka.log.s3.objects.StreamObject; import kafka.log.s3.operator.MemoryS3Operator; @@ -58,7 +59,7 @@ public void setup() { public void testTryCompact() throws Exception { AtomicLong objectIdAlloc = new AtomicLong(10); doAnswer(invocation -> CompletableFuture.completedFuture(objectIdAlloc.getAndIncrement())).when(objectManager).prepareObject(anyInt(), anyLong()); - when(objectManager.commitMinorCompactObject(any())).thenReturn(CompletableFuture.completedFuture(null)); + when(objectManager.commitWALObject(any())).thenReturn(CompletableFuture.completedFuture(new CommitWALObjectResponse())); Map> map = new HashMap<>(); map.put(233L, List.of( @@ -78,12 +79,12 @@ public void testTryCompact() throws Exception { walObjectUploadTask.commit().get(); - ArgumentCaptor reqArg = ArgumentCaptor.forClass(CommitCompactObjectRequest.class); - verify(objectManager, times(1)).commitMinorCompactObject(reqArg.capture()); + ArgumentCaptor reqArg = ArgumentCaptor.forClass(CommitWALObjectRequest.class); + verify(objectManager, times(1)).commitWALObject(reqArg.capture()); // expect // - stream233 split // - stream234 write to one stream range - CommitCompactObjectRequest request = reqArg.getValue(); + CommitWALObjectRequest request = reqArg.getValue(); assertEquals(10, request.getObjectId()); assertEquals(1, request.getStreamRanges().size()); assertEquals(234, request.getStreamRanges().get(0).getStreamId());