diff --git a/core/src/main/scala/kafka/log/s3/ObjectReader.java b/core/src/main/scala/kafka/log/s3/ObjectReader.java index c60961678a..025a387abb 100644 --- a/core/src/main/scala/kafka/log/s3/ObjectReader.java +++ b/core/src/main/scala/kafka/log/s3/ObjectReader.java @@ -64,20 +64,20 @@ public CompletableFuture read(DataBlockIndex block) { } private void asyncGetBasicObjectInfo() { - asyncGetBasicObjectInfo0(Math.max(0, metadata.getObjectSize() - 1024 * 1024)); + asyncGetBasicObjectInfo0(Math.max(0, metadata.objectSize() - 1024 * 1024)); } private void asyncGetBasicObjectInfo0(long startPosition) { - CompletableFuture cf = s3Operator.rangeRead(objectKey, startPosition, metadata.getObjectSize()); + CompletableFuture cf = s3Operator.rangeRead(objectKey, startPosition, metadata.objectSize()); cf.thenAccept(buf -> { try { - BasicObjectInfo basicObjectInfo = BasicObjectInfo.parse(buf, metadata.getObjectSize()); + BasicObjectInfo basicObjectInfo = BasicObjectInfo.parse(buf, metadata.objectSize()); basicObjectInfoCf.complete(basicObjectInfo); } catch (IndexBlockParseException ex) { asyncGetBasicObjectInfo0(ex.indexBlockPosition); } }).exceptionally(ex -> { - LOGGER.warn("s3 range read from {} [{}, {}) failed", objectKey, startPosition, metadata.getObjectSize(), ex); + LOGGER.warn("s3 range read from {} [{}, {}) failed", objectKey, startPosition, metadata.objectSize(), ex); // TODO: delay retry. asyncGetBasicObjectInfo0(startPosition); return null; diff --git a/core/src/main/scala/kafka/log/s3/StreamObjectsCompactionTask.java b/core/src/main/scala/kafka/log/s3/StreamObjectsCompactionTask.java index 5d20f0a96a..f1607df51c 100644 --- a/core/src/main/scala/kafka/log/s3/StreamObjectsCompactionTask.java +++ b/core/src/main/scala/kafka/log/s3/StreamObjectsCompactionTask.java @@ -30,8 +30,6 @@ import kafka.log.s3.operator.S3Operator; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.metadata.stream.S3ObjectMetadata; -import org.apache.kafka.metadata.stream.S3ObjectType; -import org.apache.kafka.metadata.stream.S3StreamObjectMetadata; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,7 +39,7 @@ */ public class StreamObjectsCompactionTask { private static final Logger LOGGER = LoggerFactory.getLogger(StreamObjectsCompactionTask.class); - private Queue> compactGroups; + private Queue> compactGroups; private final long compactedStreamObjectMaxSize; private final long compactableStreamObjectLivingTimeInMs; private long nextStartSearchingOffset; @@ -58,16 +56,14 @@ public StreamObjectsCompactionTask(ObjectManager objectManager, S3Operator s3Ope this.nextStartSearchingOffset = stream.startOffset(); } - private CompletableFuture doCompaction(List streamObjectMetadataList) { - List objectMetadatas = streamObjectMetadataList.stream().map(metadata -> - new S3ObjectMetadata(metadata.objectId(), metadata.objectSize(), S3ObjectType.STREAM) - ).collect(Collectors.toList()); + private CompletableFuture doCompaction(List streamObjectMetadataList) { + List objectMetadatas = streamObjectMetadataList; long startOffset = streamObjectMetadataList.get(0).startOffset(); long endOffset = streamObjectMetadataList.get(streamObjectMetadataList.size() - 1).endOffset(); List sourceObjectIds = streamObjectMetadataList .stream() - .map(S3StreamObjectMetadata::objectId) + .map(S3ObjectMetadata::objectId) .collect(Collectors.toList()); if (stream.isClosed()) { @@ -99,7 +95,7 @@ private CompletableFuture doCompaction(List stream public CompletableFuture doCompactions() { CompletableFuture lastCompactionFuture = CompletableFuture.completedFuture(null); while (!compactGroups.isEmpty()) { - List streamObjectMetadataList = compactGroups.poll(); + List streamObjectMetadataList = compactGroups.poll(); CompletableFuture future = new CompletableFuture<>(); lastCompactionFuture.whenComplete((v, ex) -> { if (ex != null) { @@ -132,17 +128,16 @@ public void prepare() { * @param startSearchingOffset start searching offset. * @return compact groups. */ - public Queue> prepareCompactGroups(long startSearchingOffset) { + public Queue> prepareCompactGroups(long startSearchingOffset) { long startOffset = Utils.max(startSearchingOffset, stream.startOffset()); - List rawFetchedStreamObjects = objectManager + List rawFetchedStreamObjects = objectManager .getStreamObjects(stream.streamId(), startOffset, stream.nextOffset(), Integer.MAX_VALUE) .stream() - .sorted() .collect(Collectors.toList()); this.nextStartSearchingOffset = calculateNextStartSearchingOffset(rawFetchedStreamObjects, startOffset); - List streamObjects = rawFetchedStreamObjects + List streamObjects = rawFetchedStreamObjects .stream() .filter(streamObject -> streamObject.objectSize() < compactedStreamObjectMaxSize) .collect(Collectors.toList()); @@ -170,7 +165,7 @@ public void close() {} * @param rawStartSearchingOffset raw start searching offset. * @return next start searching offset. */ - private long calculateNextStartSearchingOffset(List streamObjects, + private long calculateNextStartSearchingOffset(List streamObjects, long rawStartSearchingOffset) { long lastEndOffset = rawStartSearchingOffset; if (streamObjects == null || streamObjects.isEmpty()) { @@ -193,16 +188,16 @@ private long calculateNextStartSearchingOffset(List stre * @param streamObjects stream objects. * @return object groups. */ - private List> groupContinuousObjects(List streamObjects) { + private List> groupContinuousObjects(List streamObjects) { if (streamObjects == null || streamObjects.size() <= 1) { return new LinkedList<>(); } - List> stackList = new LinkedList<>(); - Stack stack = new Stack<>(); + List> stackList = new LinkedList<>(); + Stack stack = new Stack<>(); stackList.add(stack); - for (S3StreamObjectMetadata object : streamObjects) { + for (S3ObjectMetadata object : streamObjects) { if (stack.isEmpty()) { stack.push(object); } else { @@ -234,19 +229,19 @@ private List> groupContinuousObjects(List> groupEligibleObjects(List streamObjects) { + private Queue> groupEligibleObjects(List streamObjects) { if (streamObjects == null || streamObjects.size() <= 1) { return new LinkedList<>(); } - Queue> groups = new LinkedList<>(); + Queue> groups = new LinkedList<>(); int startIndex = 0; int endIndex = 0; while (startIndex < streamObjects.size() - 1) { endIndex = startIndex + 1; while (endIndex <= streamObjects.size()) { - List subGroup = streamObjects.subList(startIndex, endIndex); + List subGroup = streamObjects.subList(startIndex, endIndex); // The subgroup is too new or too big, then break; if (calculateTimePassedInMs(subGroup) < compactableStreamObjectLivingTimeInMs || calculateTotalSize(subGroup) > compactedStreamObjectMaxSize) { @@ -264,12 +259,12 @@ private Queue> groupEligibleObjects(List streamObjects) { - return System.currentTimeMillis() - streamObjects.stream().mapToLong(S3StreamObjectMetadata::timestamp).max().orElse(0L); + private long calculateTimePassedInMs(List streamObjects) { + return System.currentTimeMillis() - streamObjects.stream().mapToLong(S3ObjectMetadata::committedTimestamp).max().orElse(0L); } - private long calculateTotalSize(List streamObjects) { - return streamObjects.stream().mapToLong(S3StreamObjectMetadata::objectSize).sum(); + private long calculateTotalSize(List streamObjects) { + return streamObjects.stream().mapToLong(S3ObjectMetadata::objectSize).sum(); } public static class HaltException extends RuntimeException { diff --git a/core/src/main/scala/kafka/log/s3/memory/MemoryMetadataManager.java b/core/src/main/scala/kafka/log/s3/memory/MemoryMetadataManager.java index 302a46cae8..e20f209568 100644 --- a/core/src/main/scala/kafka/log/s3/memory/MemoryMetadataManager.java +++ b/core/src/main/scala/kafka/log/s3/memory/MemoryMetadataManager.java @@ -31,7 +31,6 @@ import org.apache.kafka.metadata.stream.S3Object; import org.apache.kafka.metadata.stream.S3ObjectMetadata; import org.apache.kafka.metadata.stream.S3ObjectState; -import org.apache.kafka.metadata.stream.S3StreamObjectMetadata; import org.apache.kafka.metadata.stream.S3StreamConstant; import org.apache.kafka.metadata.stream.S3StreamObject; import org.apache.kafka.metadata.stream.S3WALObject; @@ -159,10 +158,11 @@ public CompletableFuture commitWALObject(CommitWALObjec throw new RuntimeException("Object " + objectId + " is not in prepared state"); } // commit object - this.objectsMetadata.put(objectId, new S3Object( - objectId, objectSize, object.getObjectKey(), - object.getPreparedTimeInMs(), object.getExpiredTimeInMs(), System.currentTimeMillis(), -1, - S3ObjectState.COMMITTED) + S3Object s3Object = new S3Object( + objectId, objectSize, object.getObjectKey(), + object.getPreparedTimeInMs(), object.getExpiredTimeInMs(), System.currentTimeMillis(), -1, + S3ObjectState.COMMITTED); + this.objectsMetadata.put(objectId, s3Object ); // build metadata MemoryBrokerWALMetadata walMetadata = this.brokerWALMetadata.computeIfAbsent(MOCK_BROKER_ID, @@ -184,8 +184,7 @@ public CompletableFuture commitWALObject(CommitWALObjec streamMetadata.addStreamObject(s3StreamObject); streamMetadata.endOffset = Math.max(streamMetadata.endOffset, streamObject.getEndOffset()); }); - - S3WALObject walObject = new S3WALObject(objectId, MOCK_BROKER_ID, index, request.getOrderId()); + S3WALObject walObject = new S3WALObject(objectId, MOCK_BROKER_ID, index, request.getOrderId(), s3Object.getCommittedTimeInMs()); walMetadata.walObjects.add(walObject); return resp; }); @@ -231,6 +230,7 @@ public List getServerObjects() { } } + @Override public List getObjects(long streamId, long startOffset, long endOffset, int limit) { // TODO: support search not only in wal objects @@ -269,8 +269,8 @@ public List getObjects(long streamId, long startOffset, long e } @Override - public List getStreamObjects(long streamId, long startOffset, long endOffset, int limit) { - return Collections.emptyList(); + public List getStreamObjects(long streamId, long startOffset, long endOffset, int limit) { + throw new UnsupportedOperationException("Not support"); } @Override diff --git a/core/src/main/scala/kafka/log/s3/metadata/StreamMetadataManager.java b/core/src/main/scala/kafka/log/s3/metadata/StreamMetadataManager.java index 9f305f408f..8ed6a4d477 100644 --- a/core/src/main/scala/kafka/log/s3/metadata/StreamMetadataManager.java +++ b/core/src/main/scala/kafka/log/s3/metadata/StreamMetadataManager.java @@ -29,8 +29,8 @@ import org.apache.kafka.metadata.stream.InRangeObjects; import org.apache.kafka.metadata.stream.S3Object; import org.apache.kafka.metadata.stream.S3ObjectMetadata; +import org.apache.kafka.metadata.stream.S3StreamConstant; import org.apache.kafka.metadata.stream.S3StreamObject; -import org.apache.kafka.metadata.stream.S3StreamObjectMetadata; import org.apache.kafka.metadata.stream.StreamOffsetRange; import org.apache.kafka.raft.OffsetAndEpoch; import org.slf4j.Logger; @@ -89,6 +89,9 @@ private void onImageChanged(MetadataDelta delta, MetadataImage newImage) { // remove all catch up pending tasks List retryTasks = removePendingTasks(); // retry all pending tasks + if (retryTasks == null || retryTasks.isEmpty()) { + return; + } this.pendingExecutorService.submit(() -> { retryPendingTasks(retryTasks); }); @@ -166,13 +169,16 @@ public CompletableFuture fetch(long streamId, long startOffset, } } - public CompletableFuture> getStreamObjects(long streamId, long startOffset, long endOffset, int limit) { + public CompletableFuture> getStreamObjects(long streamId, long startOffset, long endOffset, int limit) { synchronized (StreamMetadataManager.this) { try { List streamObjects = streamsImage.getStreamObjects(streamId, startOffset, endOffset, limit); - List s3StreamObjectMetadataList = streamObjects.stream().map(object -> { - long committedTimeInMs = objectsImage.getObjectMetadata(object.objectId()).getCommittedTimeInMs(); - return new S3StreamObjectMetadata(object, committedTimeInMs); + List s3StreamObjectMetadataList = streamObjects.stream().map(object -> { + S3Object objectMetadata = objectsImage.getObjectMetadata(object.objectId()); + long committedTimeInMs = objectMetadata.getCommittedTimeInMs(); + long objectSize = objectMetadata.getObjectSize(); + return new S3ObjectMetadata(object.objectId(), object.objectType(), List.of(object.streamOffsetRange()), object.dataTimeInMs(), + committedTimeInMs, objectSize, S3StreamConstant.INVALID_ORDER_ID); }).collect(Collectors.toList()); return CompletableFuture.completedFuture(s3StreamObjectMetadataList); } catch (Exception e) { @@ -205,9 +211,9 @@ private CompletableFuture fetch0(long streamId, long startOffset streamId, startOffset, endOffset, limit); return CompletableFuture.completedFuture(InRangeObjects.INVALID); } - // fill the objects' size + // fill the objects' size and committed-timestamp for (S3ObjectMetadata object : cachedInRangeObjects.objects()) { - S3Object objectMetadata = objectsImage.getObjectMetadata(object.getObjectId()); + S3Object objectMetadata = objectsImage.getObjectMetadata(object.objectId()); if (objectMetadata == null) { // should not happen LOGGER.error( @@ -216,6 +222,7 @@ private CompletableFuture fetch0(long streamId, long startOffset return CompletableFuture.completedFuture(InRangeObjects.INVALID); } object.setObjectSize(objectMetadata.getObjectSize()); + object.setCommittedTimestamp(objectMetadata.getCommittedTimeInMs()); } LOGGER.trace( "[FetchObjects]: stream: {}, startOffset: {}, endOffset: {}, limit: {}, and search in metadataCache success with result: {}", @@ -224,17 +231,18 @@ private CompletableFuture fetch0(long streamId, long startOffset } public void retryPendingTasks(List tasks) { - if (!tasks.isEmpty()) { - LOGGER.info("[RetryPendingTasks]: retry tasks count: {}", tasks.size()); - tasks.forEach(task -> { - long streamId = task.streamId; - long startOffset = task.startOffset; - long endOffset = task.endOffset; - int limit = task.limit; - CompletableFuture newCf = this.fetch(streamId, startOffset, endOffset, limit); - FutureUtil.propagate(newCf, task.cf); - }); + if (tasks == null || tasks.isEmpty()) { + return; } + LOGGER.info("[RetryPendingTasks]: retry tasks count: {}", tasks.size()); + tasks.forEach(task -> { + long streamId = task.streamId; + long startOffset = task.startOffset; + long endOffset = task.endOffset; + int limit = task.limit; + CompletableFuture newCf = this.fetch(streamId, startOffset, endOffset, limit); + FutureUtil.propagate(newCf, task.cf); + }); } static class GetObjectsTask { diff --git a/core/src/main/scala/kafka/log/s3/objects/ControllerObjectManager.java b/core/src/main/scala/kafka/log/s3/objects/ControllerObjectManager.java index 8d8e28d3e1..f406a0887b 100644 --- a/core/src/main/scala/kafka/log/s3/objects/ControllerObjectManager.java +++ b/core/src/main/scala/kafka/log/s3/objects/ControllerObjectManager.java @@ -30,7 +30,6 @@ import org.apache.kafka.common.requests.s3.PrepareS3ObjectRequest.Builder; import org.apache.kafka.metadata.stream.InRangeObjects; import org.apache.kafka.metadata.stream.S3ObjectMetadata; -import org.apache.kafka.metadata.stream.S3StreamObjectMetadata; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -140,18 +139,19 @@ public List getObjects(long streamId, long startOffset, long e } @Override - public List getStreamObjects(long streamId, long startOffset, long endOffset, int limit) { + public List getServerObjects() { + return null; + } + + @Override + public List getStreamObjects(long streamId, long startOffset, long endOffset, int limit) { try { return this.metadataManager.getStreamObjects(streamId, startOffset, endOffset, limit).get(); } catch (Exception e) { - LOGGER.error("Error while get objects, streamId: {}, startOffset: {}, endOffset: {}, limit: {}", streamId, startOffset, endOffset, limit, + LOGGER.error("Error while get stream objects, streamId: {}, startOffset: {}, endOffset: {}, limit: {}", streamId, startOffset, endOffset, + limit, e); return Collections.emptyList(); } } - - @Override - public List getServerObjects() { - return null; - } } diff --git a/core/src/main/scala/kafka/log/s3/objects/ObjectManager.java b/core/src/main/scala/kafka/log/s3/objects/ObjectManager.java index 46ef97e74f..3c5fb0c969 100644 --- a/core/src/main/scala/kafka/log/s3/objects/ObjectManager.java +++ b/core/src/main/scala/kafka/log/s3/objects/ObjectManager.java @@ -17,11 +17,9 @@ package kafka.log.s3.objects; -import org.apache.kafka.metadata.stream.S3ObjectMetadata; - import java.util.List; import java.util.concurrent.CompletableFuture; -import org.apache.kafka.metadata.stream.S3StreamObjectMetadata; +import org.apache.kafka.metadata.stream.S3ObjectMetadata; /** * Object metadata registry. @@ -56,29 +54,38 @@ public interface ObjectManager { * Get objects by stream range. * When obj1 contains stream0 [0, 100) [200, 300) and obj2 contains stream1 [100, 200), * expect getObjects(streamId, 0, 300) return [obj1, obj2, obj1] - * - * @param streamId stream id. + *
    + *
  • Concern two types of objects: stream object and wal object. + *
  • Returned objects must be continuous of stream range. + *
  • Returned objects aren't physical object concept, they are logical object concept. + * (regard each returned object-metadata as a slice of object) + *
+ * @param streamId stream id. * @param startOffset get range start offset. - * @param endOffset get range end offset. - * @param limit max object count. Why use limit instead of maxBytes? Because we cannot get stream size from object metadata. + * @param endOffset get range end offset. + * @param limit max object range count. * @return {@link S3ObjectMetadata} */ List getObjects(long streamId, long startOffset, long endOffset, int limit); - /** - * Get stream objects by stream range and limit. - * @param streamId stream id. - * @param startOffset searching start offset in the stream. - * @param endOffset searching end offset in the stream. - * @param limit max object count. - * @return {@link S3StreamObjectMetadata} - */ - List getStreamObjects(long streamId, long startOffset, long endOffset, int limit); - /** * Get current server wal objects. * When server is starting, wal need server wal objects to recover. */ List getServerObjects(); + + /** + * Get stream objects by stream range. + *
    + *
  • Only concern about stream objects, ignore wal objects. + *
  • Returned stream objects can be discontinuous of stream range. + *
+ * @param streamId stream id. + * @param startOffset get range start offset. + * @param endOffset get range end offset. + * @param limit max object count. + * @return {@link S3ObjectMetadata} + */ + List getStreamObjects(long streamId, long startOffset, long endOffset, int limit); } diff --git a/core/src/test/java/kafka/log/s3/StreamMetadataManagerTest.java b/core/src/test/java/kafka/log/s3/StreamMetadataManagerTest.java index ecae3c66f6..5c895db7d4 100644 --- a/core/src/test/java/kafka/log/s3/StreamMetadataManagerTest.java +++ b/core/src/test/java/kafka/log/s3/StreamMetadataManagerTest.java @@ -31,9 +31,8 @@ import org.apache.kafka.metadata.stream.InRangeObjects; import org.apache.kafka.metadata.stream.RangeMetadata; import org.apache.kafka.metadata.stream.S3Object; -import org.apache.kafka.metadata.stream.S3ObjectMetadata; import org.apache.kafka.metadata.stream.S3ObjectState; -import org.apache.kafka.metadata.stream.S3ObjectType; +import org.apache.kafka.metadata.stream.S3StreamConstant; import org.apache.kafka.metadata.stream.S3StreamObject; import org.apache.kafka.metadata.stream.StreamState; import org.junit.jupiter.api.BeforeEach; @@ -98,7 +97,7 @@ public void setUp() { 0, new RangeMetadata(STREAM0, 0L, 0, 10L, 100L, BROKER0) ); Map streamObjects = Map.of( - 0L, new S3StreamObject(0L, 128, STREAM0, 10L, 100L)); + 0L, new S3StreamObject(0L, STREAM0, 10L, 100L, S3StreamConstant.INVALID_TS)); S3StreamMetadataImage streamImage = new S3StreamMetadataImage(STREAM0, 1L, StreamState.OPENED, 0, 10L, ranges, streamObjects); S3StreamsMetadataImage streamsImage = new S3StreamsMetadataImage(STREAM0, Map.of(STREAM0, streamImage), Map.of(BROKER0, BrokerS3WALMetadataImage.EMPTY)); @@ -107,7 +106,7 @@ public void setUp() { ranges = new HashMap<>(ranges); ranges.put(1, new RangeMetadata(STREAM0, 1L, 1, 100L, 150L, BROKER0)); streamObjects = new HashMap<>(streamObjects); - streamObjects.put(1L, new S3StreamObject(1L, 128, STREAM0, 100L, 150L)); + streamObjects.put(1L, new S3StreamObject(1L, STREAM0, 100L, 150L, S3StreamConstant.INVALID_TS)); streamImage = new S3StreamMetadataImage(STREAM0, 2L, StreamState.OPENED, 1, 10L, ranges, streamObjects); streamsImage = new S3StreamsMetadataImage(STREAM0, Map.of(STREAM0, streamImage), Map.of(BROKER0, BrokerS3WALMetadataImage.EMPTY)); @@ -116,7 +115,7 @@ public void setUp() { ranges = new HashMap<>(ranges); ranges.put(2, new RangeMetadata(STREAM0, 2L, 2, 150L, 200L, BROKER0)); streamObjects = new HashMap<>(streamObjects); - streamObjects.put(2L, new S3StreamObject(2L, 128, STREAM0, 150L, 200L)); + streamObjects.put(2L, new S3StreamObject(2L, STREAM0, 150L, 200L, S3StreamConstant.INVALID_TS)); streamImage = new S3StreamMetadataImage(STREAM0, 3L, StreamState.OPENED, 2, 10L, ranges, streamObjects); streamsImage = new S3StreamsMetadataImage(STREAM0, Map.of(STREAM0, streamImage), Map.of(BROKER0, BrokerS3WALMetadataImage.EMPTY)); @@ -125,9 +124,6 @@ public void setUp() { @Test public void testFetch() throws Exception { - S3ObjectMetadata object0 = new S3ObjectMetadata(0L, 128, S3ObjectType.STREAM); - S3ObjectMetadata object1 = new S3ObjectMetadata(1L, 128, S3ObjectType.STREAM); - S3ObjectMetadata object2 = new S3ObjectMetadata(2L, 128, S3ObjectType.STREAM); this.streamMetadataListener.onChange(null, image0); @@ -138,7 +134,7 @@ public void testFetch() throws Exception { assertEquals(10L, inRangeObjects.startOffset()); assertEquals(100L, inRangeObjects.endOffset()); assertEquals(1, inRangeObjects.objects().size()); - assertEquals(object0, inRangeObjects.objects().get(0)); + assertEquals(0L, inRangeObjects.objects().get(0).objectId()); // 2. fetch with invalid streamId result = this.manager.fetch(STREAM1, 0L, 100L, 5); @@ -152,7 +148,7 @@ public void testFetch() throws Exception { assertEquals(20L, inRangeObjects.startOffset()); assertEquals(100L, inRangeObjects.endOffset()); assertEquals(1, inRangeObjects.objects().size()); - assertEquals(object0, inRangeObjects.objects().get(0)); + assertEquals(0L, inRangeObjects.objects().get(0).objectId()); // 4. fetch with smaller endOffset result = this.manager.fetch(STREAM0, 10L, 50L, 5); @@ -161,7 +157,7 @@ public void testFetch() throws Exception { assertEquals(10L, inRangeObjects.startOffset()); assertEquals(100L, inRangeObjects.endOffset()); assertEquals(1, inRangeObjects.objects().size()); - assertEquals(object0, inRangeObjects.objects().get(0)); + assertEquals(0L, inRangeObjects.objects().get(0).objectId()); // 5. fetch with smaller startOffset result = this.manager.fetch(STREAM0, 5L, 100L, 5); @@ -191,9 +187,9 @@ public void testFetch() throws Exception { assertEquals(10L, rangeObjects.startOffset()); assertEquals(200L, rangeObjects.endOffset()); assertEquals(3, rangeObjects.objects().size()); - assertEquals(object0, rangeObjects.objects().get(0)); - assertEquals(object1, rangeObjects.objects().get(1)); - assertEquals(object2, rangeObjects.objects().get(2)); + assertEquals(0L, rangeObjects.objects().get(0).objectId()); + assertEquals(1L, rangeObjects.objects().get(1).objectId()); + assertEquals(2L, rangeObjects.objects().get(2).objectId()); }); } diff --git a/core/src/test/java/kafka/log/s3/StreamObjectsCompactionTaskTest.java b/core/src/test/java/kafka/log/s3/StreamObjectsCompactionTaskTest.java index e54010eb94..8412d20c3d 100644 --- a/core/src/test/java/kafka/log/s3/StreamObjectsCompactionTaskTest.java +++ b/core/src/test/java/kafka/log/s3/StreamObjectsCompactionTaskTest.java @@ -33,8 +33,10 @@ import kafka.log.s3.objects.StreamObject; import kafka.log.s3.operator.MemoryS3Operator; import kafka.log.s3.operator.S3Operator; -import org.apache.kafka.metadata.stream.S3StreamObject; -import org.apache.kafka.metadata.stream.S3StreamObjectMetadata; +import org.apache.kafka.metadata.stream.S3ObjectType; +import org.apache.kafka.metadata.stream.S3StreamConstant; +import org.apache.kafka.metadata.stream.S3ObjectMetadata; +import org.apache.kafka.metadata.stream.StreamOffsetRange; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; @@ -52,6 +54,7 @@ @Tag("S3Unit") class StreamObjectsCompactionTaskTest { + private ObjectManager objectManager; private S3Operator s3Operator; private S3Stream stream; @@ -75,7 +78,7 @@ void testTriggerTask() throws ExecutionException, InterruptedException { List.of(65L, 70L, 1000L), List.of(70L, 80L, 1000L) ); - List metadataList = prepareRawStreamObjects(10, stream.streamId(), objectsDetails); + List metadataList = prepareRawStreamObjects(10, stream.streamId(), objectsDetails); // two stream object groups should be handled when(objectManager.getStreamObjects(anyLong(), anyLong(), anyLong(), anyInt())) @@ -84,7 +87,8 @@ void testTriggerTask() throws ExecutionException, InterruptedException { AtomicLong objectIdAlloc = new AtomicLong(100); List committedRequests = new ArrayList<>(); - when(objectManager.prepareObject(anyInt(), anyLong())).thenAnswer(invocation -> CompletableFuture.completedFuture(objectIdAlloc.getAndIncrement())); + when(objectManager.prepareObject(anyInt(), anyLong())).thenAnswer( + invocation -> CompletableFuture.completedFuture(objectIdAlloc.getAndIncrement())); when(objectManager.commitStreamObject(any(CommitStreamObjectRequest.class))).thenAnswer(invocation -> { committedRequests.add(invocation.getArgument(0)); return CompletableFuture.completedFuture(null); @@ -114,14 +118,20 @@ void testTriggerTaskFailure() throws InterruptedException { // 2 compaction groups will be handled. when(objectManager.getStreamObjects(anyLong(), anyLong(), anyLong(), anyInt())) .thenReturn(List.of( - new S3StreamObjectMetadata(new S3StreamObject(1, 150, 1, 5, 10), 0), - new S3StreamObjectMetadata(new S3StreamObject(2, 20, 1, 10, 20), 0), - new S3StreamObjectMetadata(new S3StreamObject(3, 20, 1, 40, 50), 0), - new S3StreamObjectMetadata(new S3StreamObject(4, 20, 1, 50, 60), 0), - new S3StreamObjectMetadata(new S3StreamObject(5, 20, 1, 65, 70), 0), - new S3StreamObjectMetadata(new S3StreamObject(6, 20, 1, 70, 80), 0) - )); - when(objectManager.prepareObject(anyInt(), anyLong())).thenReturn(CompletableFuture.failedFuture(new RuntimeException("halt compaction task"))); + new S3ObjectMetadata(1, S3ObjectType.STREAM, List.of(new StreamOffsetRange(1, 5, 10)), 0, 0, 150, + S3StreamConstant.INVALID_ORDER_ID), + new S3ObjectMetadata(2, S3ObjectType.STREAM, List.of(new StreamOffsetRange(1, 10, 20)), 0, 0, 20, + S3StreamConstant.INVALID_ORDER_ID), + new S3ObjectMetadata(3, S3ObjectType.STREAM, List.of(new StreamOffsetRange(1, 40, 50)), 0, 0, 20, + S3StreamConstant.INVALID_ORDER_ID), + new S3ObjectMetadata(4, S3ObjectType.STREAM, List.of(new StreamOffsetRange(1, 50, 60)), 0, 0, 20, + S3StreamConstant.INVALID_ORDER_ID), + new S3ObjectMetadata(5, S3ObjectType.STREAM, List.of(new StreamOffsetRange(1, 65, 70)), 0, 0, 20, + S3StreamConstant.INVALID_ORDER_ID), + new S3ObjectMetadata(6, S3ObjectType.STREAM, List.of(new StreamOffsetRange(1, 70, 80)), 0, 0, 20, + S3StreamConstant.INVALID_ORDER_ID))); + when(objectManager.prepareObject(anyInt(), anyLong())).thenReturn( + CompletableFuture.failedFuture(new RuntimeException("halt compaction task"))); // The first group's compaction failed in prepareObject phase, the second group should not be handled. StreamObjectsCompactionTask task = new StreamObjectsCompactionTask(objectManager, s3Operator, stream, 100, 0); @@ -157,17 +167,18 @@ void testTriggerTaskFailure() throws InterruptedException { * @throws ExecutionException * @throws InterruptedException */ - List prepareRawStreamObjects(long startObjectId, long streamId, + List prepareRawStreamObjects(long startObjectId, long streamId, List> objectsDetails) throws ExecutionException, InterruptedException { AtomicLong objectIdAlloc = new AtomicLong(startObjectId); Stack commitWALObjectRequests = new Stack<>(); - doAnswer(invocation -> CompletableFuture.completedFuture(objectIdAlloc.getAndIncrement())).when(objectManager).prepareObject(anyInt(), anyLong()); + doAnswer(invocation -> CompletableFuture.completedFuture(objectIdAlloc.getAndIncrement())).when(objectManager) + .prepareObject(anyInt(), anyLong()); when(objectManager.commitWALObject(any())).thenAnswer(invocation -> { commitWALObjectRequests.push(invocation.getArgument(0)); return CompletableFuture.completedFuture(new CommitWALObjectResponse()); }); - List metadataList = new ArrayList<>(); + List metadataList = new ArrayList<>(); for (int i = 0; i < objectsDetails.size(); i++) { List objectsDetail = objectsDetails.get(i); @@ -175,8 +186,10 @@ List prepareRawStreamObjects(long startObjectId, long st long endOffset = objectsDetail.get(1); int recordsSize = Math.toIntExact(objectsDetail.get(2)); - Map> map = Map.of(streamId, List.of(new StreamRecordBatch(streamId, 0, startOffset, Math.toIntExact(endOffset - startOffset), random(recordsSize)))); - WALObjectUploadTask walObjectUploadTask = new WALObjectUploadTask(map, objectManager, s3Operator, 16 * 1024 * 1024, 16 * 1024 * 1024, recordsSize - 1); + Map> map = Map.of(streamId, + List.of(new StreamRecordBatch(streamId, 0, startOffset, Math.toIntExact(endOffset - startOffset), random(recordsSize)))); + WALObjectUploadTask walObjectUploadTask = new WALObjectUploadTask(map, objectManager, s3Operator, 16 * 1024 * 1024, 16 * 1024 * 1024, + recordsSize - 1); walObjectUploadTask.prepare().get(); walObjectUploadTask.upload().get(); @@ -192,7 +205,10 @@ List prepareRawStreamObjects(long startObjectId, long st assertEquals(startOffset, streamObject.getStartOffset()); assertEquals(endOffset, streamObject.getEndOffset()); - metadataList.add(new S3StreamObjectMetadata(new S3StreamObject(streamObject.getObjectId(), streamObject.getObjectSize(), streamObject.getStreamId(), streamObject.getStartOffset(), streamObject.getEndOffset()), System.currentTimeMillis())); + metadataList.add( + new S3ObjectMetadata(streamObject.getObjectId(), S3ObjectType.STREAM, List.of(new StreamOffsetRange(streamObject.getStreamId(), + streamObject.getStartOffset(), streamObject.getEndOffset())), 0, System.currentTimeMillis(), streamObject.getObjectSize(), + S3StreamConstant.INVALID_ORDER_ID)); } return metadataList; } @@ -205,20 +221,31 @@ void testPrepareCompactGroups() { long currentTimestamp = System.currentTimeMillis(); when(objectManager.getStreamObjects(anyLong(), anyLong(), anyLong(), anyInt())) .thenReturn(List.of( - new S3StreamObjectMetadata(new S3StreamObject(1, 150, 1, 5, 10), currentTimestamp), - new S3StreamObjectMetadata(new S3StreamObject(2, 20, 1, 10, 20), currentTimestamp), - new S3StreamObjectMetadata(new S3StreamObject(3, 20, 1, 40, 50), currentTimestamp), - new S3StreamObjectMetadata(new S3StreamObject(4, 20, 1, 50, 60), currentTimestamp), - new S3StreamObjectMetadata(new S3StreamObject(5, 20, 1, 65, 70), currentTimestamp), - new S3StreamObjectMetadata(new S3StreamObject(6, 20, 1, 70, 80), currentTimestamp) - )); - Queue> compactGroups = task1.prepareCompactGroups(0); + new S3ObjectMetadata(1, S3ObjectType.STREAM, List.of(new StreamOffsetRange(1, 5, 10)), 0, currentTimestamp, 150, + S3StreamConstant.INVALID_ORDER_ID), + new S3ObjectMetadata(2, S3ObjectType.STREAM, List.of(new StreamOffsetRange(1, 10, 20)), 0, currentTimestamp, 20, + S3StreamConstant.INVALID_ORDER_ID), + new S3ObjectMetadata(3, S3ObjectType.STREAM, List.of(new StreamOffsetRange(1, 40, 50)), 0, currentTimestamp, 20, + S3StreamConstant.INVALID_ORDER_ID), + new S3ObjectMetadata(4, S3ObjectType.STREAM, List.of(new StreamOffsetRange(1, 50, 60)), 0, currentTimestamp, 20, + S3StreamConstant.INVALID_ORDER_ID), + new S3ObjectMetadata(5, S3ObjectType.STREAM, List.of(new StreamOffsetRange(1, 65, 70)), 0, currentTimestamp, 20, + S3StreamConstant.INVALID_ORDER_ID), + new S3ObjectMetadata(6, S3ObjectType.STREAM, List.of(new StreamOffsetRange(1, 70, 80)), 0, currentTimestamp, 20, + S3StreamConstant.INVALID_ORDER_ID))); + Queue> compactGroups = task1.prepareCompactGroups(0); assertEquals(2, compactGroups.size()); - assertEquals(List.of(new S3StreamObjectMetadata(new S3StreamObject(3, 20, 1, 40, 50), currentTimestamp), - new S3StreamObjectMetadata(new S3StreamObject(4, 20, 1, 50, 60), currentTimestamp)), compactGroups.poll()); - assertEquals(List.of(new S3StreamObjectMetadata(new S3StreamObject(5, 20, 1, 65, 70), currentTimestamp), - new S3StreamObjectMetadata(new S3StreamObject(6, 20, 1, 70, 80), currentTimestamp)), compactGroups.poll()); + assertEquals(List.of( + new S3ObjectMetadata(3, S3ObjectType.STREAM, List.of(new StreamOffsetRange(1, 40, 50)), 0, currentTimestamp, 20, + S3StreamConstant.INVALID_ORDER_ID), + new S3ObjectMetadata(4, S3ObjectType.STREAM, List.of(new StreamOffsetRange(1, 50, 60)), 0, currentTimestamp, 20, + S3StreamConstant.INVALID_ORDER_ID)), compactGroups.poll()); + assertEquals(List.of( + new S3ObjectMetadata(5, S3ObjectType.STREAM, List.of(new StreamOffsetRange(1, 65, 70)), 0, currentTimestamp, 20, + S3StreamConstant.INVALID_ORDER_ID), + new S3ObjectMetadata(6, S3ObjectType.STREAM, List.of(new StreamOffsetRange(1, 70, 80)), 0, currentTimestamp, 20, + S3StreamConstant.INVALID_ORDER_ID)), compactGroups.poll()); assertEquals(10, task1.getNextStartSearchingOffset()); // check if we can filter two groups with limit of timestamp @@ -227,22 +254,34 @@ void testPrepareCompactGroups() { currentTimestamp = System.currentTimeMillis(); when(objectManager.getStreamObjects(anyLong(), anyLong(), anyLong(), anyInt())) .thenReturn(List.of( - new S3StreamObjectMetadata(new S3StreamObject(1, 60, 1, 5, 10), currentTimestamp - 20000), - new S3StreamObjectMetadata(new S3StreamObject(2, 20, 1, 10, 40), currentTimestamp), - new S3StreamObjectMetadata(new S3StreamObject(3, 20, 1, 40, 50), currentTimestamp - 20000), - new S3StreamObjectMetadata(new S3StreamObject(4, 20, 1, 50, 60), currentTimestamp), - new S3StreamObjectMetadata(new S3StreamObject(5, 20, 1, 60, 70), currentTimestamp - 30000), - new S3StreamObjectMetadata(new S3StreamObject(6, 20, 1, 70, 80), currentTimestamp - 30000), - new S3StreamObjectMetadata(new S3StreamObject(7, 20, 1, 80, 90), currentTimestamp - 30000), - new S3StreamObjectMetadata(new S3StreamObject(8, 80, 1, 90, 95), currentTimestamp - 30000), - new S3StreamObjectMetadata(new S3StreamObject(9, 20, 1, 95, 99), currentTimestamp) - )); + new S3ObjectMetadata(1, S3ObjectType.STREAM, List.of(new StreamOffsetRange(1, 5, 10)), 0, currentTimestamp - 20000, 60, + S3StreamConstant.INVALID_ORDER_ID), + new S3ObjectMetadata(2, S3ObjectType.STREAM, List.of(new StreamOffsetRange(1, 10, 40)), 0, currentTimestamp, 20, + S3StreamConstant.INVALID_ORDER_ID), + new S3ObjectMetadata(3, S3ObjectType.STREAM, List.of(new StreamOffsetRange(1, 40, 50)), 0, currentTimestamp - 20000, 20, + S3StreamConstant.INVALID_ORDER_ID), + new S3ObjectMetadata(4, S3ObjectType.STREAM, List.of(new StreamOffsetRange(1, 50, 60)), 0, currentTimestamp, 20, + S3StreamConstant.INVALID_ORDER_ID), + new S3ObjectMetadata(5, S3ObjectType.STREAM, List.of(new StreamOffsetRange(1, 60, 70)), 0, currentTimestamp - 30000, 20, + S3StreamConstant.INVALID_ORDER_ID), + new S3ObjectMetadata(6, S3ObjectType.STREAM, List.of(new StreamOffsetRange(1, 70, 80)), 0, currentTimestamp - 30000, 20, + S3StreamConstant.INVALID_ORDER_ID), + new S3ObjectMetadata(7, S3ObjectType.STREAM, List.of(new StreamOffsetRange(1, 80, 90)), 0, currentTimestamp - 30000, 20, + S3StreamConstant.INVALID_ORDER_ID), + new S3ObjectMetadata(8, S3ObjectType.STREAM, List.of(new StreamOffsetRange(1, 90, 95)), 0, currentTimestamp - 30000, 80, + S3StreamConstant.INVALID_ORDER_ID), + new S3ObjectMetadata(9, S3ObjectType.STREAM, List.of(new StreamOffsetRange(1, 95, 99)), 0, currentTimestamp, 20, + S3StreamConstant.INVALID_ORDER_ID))); compactGroups = task2.prepareCompactGroups(0); assertEquals(1, compactGroups.size()); - assertEquals(List.of(new S3StreamObjectMetadata(new S3StreamObject(5, 20, 1, 60, 70), currentTimestamp - 30000), - new S3StreamObjectMetadata(new S3StreamObject(6, 20, 1, 70, 80), currentTimestamp - 30000), - new S3StreamObjectMetadata(new S3StreamObject(7, 20, 1, 80, 90), currentTimestamp - 30000)), compactGroups.poll()); + assertEquals(List.of( + new S3ObjectMetadata(5, S3ObjectType.STREAM, List.of(new StreamOffsetRange(1, 60, 70)), 0, currentTimestamp - 30000, 20, + S3StreamConstant.INVALID_ORDER_ID), + new S3ObjectMetadata(6, S3ObjectType.STREAM, List.of(new StreamOffsetRange(1, 70, 80)), 0, currentTimestamp - 30000, 20, + S3StreamConstant.INVALID_ORDER_ID), + new S3ObjectMetadata(7, S3ObjectType.STREAM, List.of(new StreamOffsetRange(1, 80, 90)), 0, currentTimestamp - 30000, 20, + S3StreamConstant.INVALID_ORDER_ID)), compactGroups.poll()); assertEquals(5, task2.getNextStartSearchingOffset()); } } \ No newline at end of file diff --git a/metadata/src/main/java/org/apache/kafka/controller/stream/S3ObjectControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/stream/S3ObjectControlManager.java index 290898a9ad..292e57676a 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/stream/S3ObjectControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/stream/S3ObjectControlManager.java @@ -146,7 +146,7 @@ public ControllerResult prepareObject(PrepareS3Obje long expiredTs = preparedTs + request.timeToLiveInMs(); S3ObjectRecord record = new S3ObjectRecord() .setObjectId(objectId) - .setObjectState((byte) S3ObjectState.PREPARED.ordinal()) + .setObjectState(S3ObjectState.PREPARED.toByte()) .setPreparedTimeInMs(preparedTs) .setExpiredTimeInMs(expiredTs); records.add(new ApiMessageAndVersion(record, (short) 0)); @@ -155,7 +155,7 @@ public ControllerResult prepareObject(PrepareS3Obje return ControllerResult.atomicOf(records, response); } - public ControllerResult commitObject(long objectId, long objectSize) { + public ControllerResult commitObject(long objectId, long objectSize, long committedTs) { S3Object object = this.objectsMetadata.get(objectId); if (object == null) { log.error("object {} not exist when commit wal object", objectId); @@ -176,7 +176,7 @@ public ControllerResult commitObject(long objectId, long objectSize) { .setObjectState(S3ObjectState.COMMITTED.toByte()) .setPreparedTimeInMs(object.getPreparedTimeInMs()) .setExpiredTimeInMs(object.getExpiredTimeInMs()) - .setCommittedTimeInMs(System.currentTimeMillis()); + .setCommittedTimeInMs(committedTs); return ControllerResult.of(List.of( new ApiMessageAndVersion(record, (short) 0)), Errors.NONE); } diff --git a/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java index 55ce9ac0ba..aff2f46483 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java @@ -363,8 +363,11 @@ public ControllerResult commitWALObject(CommitWALOb long orderId = data.orderId(); List streamRanges = data.objectStreamRanges(); List compactedObjectIds = data.compactedObjectIds(); + List streamObjects = data.streamObjects(); + long committedTs = System.currentTimeMillis(); + // commit object - ControllerResult commitResult = this.s3ObjectControlManager.commitObject(objectId, objectSize); + ControllerResult commitResult = this.s3ObjectControlManager.commitObject(objectId, objectSize, committedTs); if (commitResult.response() == Errors.OBJECT_NOT_EXIST) { log.error("[CommitWALObject]: object {} not exist when commit wal object", objectId); resp.setErrorCode(Errors.OBJECT_NOT_EXIST.code()); @@ -376,6 +379,7 @@ public ControllerResult commitWALObject(CommitWALOb return ControllerResult.of(Collections.emptyList(), resp); } records.addAll(commitResult.records()); + long dataTs = committedTs; // mark destroy compacted object if (compactedObjectIds != null && !compactedObjectIds.isEmpty()) { ControllerResult destroyResult = this.s3ObjectControlManager.markDestroyObjects(compactedObjectIds); @@ -385,8 +389,12 @@ public ControllerResult commitWALObject(CommitWALOb return ControllerResult.of(Collections.emptyList(), resp); } records.addAll(destroyResult.records()); + // update dataTs to the min compacted object's dataTs + dataTs = compactedObjectIds.stream() + .map(id -> this.brokersMetadata.get(brokerId).walObjects.get(id)) + .map(S3WALObject::dataTimeInMs) + .min(Long::compareTo).get(); } - List indexes = streamRanges.stream() .map(range -> new StreamOffsetRange(range.streamId(), range.startOffset(), range.endOffset())) .collect(Collectors.toList()); @@ -400,26 +408,42 @@ public ControllerResult commitWALObject(CommitWALOb // generate broker's wal object record records.add(new ApiMessageAndVersion(new WALObjectRecord() .setObjectId(objectId) + .setDataTimeInMs(dataTs) .setOrderId(orderId) .setBrokerId(brokerId) .setStreamsIndex( indexes.stream() .map(StreamOffsetRange::toRecordStreamIndex) .collect(Collectors.toList())), (short) 0)); - // create stream object records - List streamObjects = data.streamObjects(); - streamObjects.stream().forEach(obj -> { - long streamId = obj.streamId(); - long startOffset = obj.startOffset(); - long endOffset = obj.endOffset(); - records.add(new S3StreamObject(obj.objectId(), obj.objectSize(), streamId, startOffset, endOffset).toRecord()); - }); + // commit stream objects + if (streamObjects != null && !streamObjects.isEmpty()) { + // commit objects + for (StreamObject streamObject : streamObjects) { + ControllerResult streamObjectCommitResult = this.s3ObjectControlManager.commitObject(streamObject.objectId(), + streamObject.objectSize(), committedTs); + if (streamObjectCommitResult.response() != Errors.NONE) { + log.error("[CommitWALObject]: stream object: {} not exist when commit wal object: {}", streamObject.objectId(), objectId); + resp.setErrorCode(streamObjectCommitResult.response().code()); + return ControllerResult.of(Collections.emptyList(), resp); + } + records.addAll(streamObjectCommitResult.records()); + } + // create stream object records + streamObjects.stream().forEach(obj -> { + long streamId = obj.streamId(); + long startOffset = obj.startOffset(); + long endOffset = obj.endOffset(); + records.add(new S3StreamObject(obj.objectId(), streamId, startOffset, endOffset, committedTs).toRecord()); + }); + } + // generate compacted objects' remove record if (compactedObjectIds != null && !compactedObjectIds.isEmpty()) { - // generate compacted objects' remove record compactedObjectIds.forEach(id -> records.add(new ApiMessageAndVersion(new RemoveWALObjectRecord() + .setBrokerId(brokerId) .setObjectId(id), (short) 0))); } - log.info("[CommitWALObject]: broker: {} commit wal object: {} success, compacted objects: {}", brokerId, objectId, compactedObjectIds); + log.info("[CommitWALObject]: broker: {} commit wal object: {} success, compacted objects: {}, stream objects: {}", brokerId, objectId, + compactedObjectIds, streamObjects); return ControllerResult.atomicOf(records, resp); } @@ -432,9 +456,10 @@ public ControllerResult commitStreamObject(Commi List sourceObjectIds = data.sourceObjectIds(); List records = new ArrayList<>(); CommitStreamObjectResponseData resp = new CommitStreamObjectResponseData(); + long committedTs = System.currentTimeMillis(); // commit object - ControllerResult commitResult = this.s3ObjectControlManager.commitObject(streamObjectId, objectSize); + ControllerResult commitResult = this.s3ObjectControlManager.commitObject(streamObjectId, objectSize, committedTs); if (commitResult.response() == Errors.OBJECT_NOT_EXIST) { log.error("[CommitStreamObject]: object {} not exist when commit stream object", streamObjectId); resp.setErrorCode(Errors.OBJECT_NOT_EXIST.code()); @@ -445,7 +470,9 @@ public ControllerResult commitStreamObject(Commi log.warn("[CommitStreamObject]: object {} already committed", streamObjectId); return ControllerResult.of(Collections.emptyList(), resp); } + records.addAll(commitResult.records()); + long dataTs = committedTs; // mark destroy compacted object if (sourceObjectIds != null && !sourceObjectIds.isEmpty()) { ControllerResult destroyResult = this.s3ObjectControlManager.markDestroyObjects(sourceObjectIds); @@ -454,15 +481,21 @@ public ControllerResult commitStreamObject(Commi resp.setErrorCode(Errors.STREAM_INNER_ERROR.code()); return ControllerResult.of(Collections.emptyList(), resp); } + records.addAll(destroyResult.records()); + // update dataTs to the min compacted object's dataTs + dataTs = sourceObjectIds.stream() + .map(id -> this.streamsMetadata.get(streamId).streamObjects.get(id)) + .map(S3StreamObject::dataTimeInMs) + .min(Long::compareTo).get(); } // generate stream object record records.add(new ApiMessageAndVersion(new S3StreamObjectRecord() .setObjectId(streamObjectId) .setStreamId(streamId) - .setObjectSize(objectSize) .setStartOffset(startOffset) - .setEndOffset(endOffset), (short) 0)); + .setEndOffset(endOffset) + .setDataTimeInMs(dataTs), (short) 0)); // generate compacted objects' remove record if (sourceObjectIds != null && !sourceObjectIds.isEmpty()) { @@ -557,6 +590,7 @@ public void replay(WALObjectRecord record) { long objectId = record.objectId(); int brokerId = record.brokerId(); long orderId = record.orderId(); + long dataTs = record.dataTimeInMs(); List streamIndexes = record.streamsIndex(); BrokerS3WALMetadata brokerMetadata = this.brokersMetadata.get(brokerId); if (brokerMetadata == null) { @@ -570,7 +604,7 @@ public void replay(WALObjectRecord record) { .stream() .map(StreamOffsetRange::of) .collect(Collectors.groupingBy(StreamOffsetRange::getStreamId)); - brokerMetadata.walObjects.put(objectId, new S3WALObject(objectId, brokerId, indexMap, orderId)); + brokerMetadata.walObjects.put(objectId, new S3WALObject(objectId, brokerId, indexMap, orderId, dataTs)); // update range record.streamsIndex().forEach(index -> { @@ -609,7 +643,7 @@ public void replay(S3StreamObjectRecord record) { long streamId = record.streamId(); long startOffset = record.startOffset(); long endOffset = record.endOffset(); - long objectSize = record.objectSize(); + long dataTs = record.dataTimeInMs(); S3StreamMetadata streamMetadata = this.streamsMetadata.get(streamId); if (streamMetadata == null) { @@ -617,7 +651,7 @@ public void replay(S3StreamObjectRecord record) { log.error("stream {} not exist when replay stream object record {}", streamId, record); return; } - streamMetadata.streamObjects.put(objectId, new S3StreamObject(objectId, objectSize, streamId, startOffset, endOffset)); + streamMetadata.streamObjects.put(objectId, new S3StreamObject(objectId, streamId, startOffset, endOffset, dataTs)); // update range RangeMetadata rangeMetadata = streamMetadata.currentRangeMetadata(); if (rangeMetadata == null) { diff --git a/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataImage.java b/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataImage.java index 005f27edec..27bb183a88 100644 --- a/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataImage.java +++ b/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataImage.java @@ -33,7 +33,6 @@ import org.apache.kafka.metadata.stream.RangeMetadata; import org.apache.kafka.metadata.stream.S3ObjectMetadata; import org.apache.kafka.metadata.stream.StreamOffsetRange; -import org.apache.kafka.metadata.stream.S3ObjectType; import org.apache.kafka.metadata.stream.S3StreamObject; import org.apache.kafka.server.common.ApiMessageAndVersion; @@ -99,12 +98,12 @@ public InRangeObjects getObjects(long streamId, long startOffset, long endOffset } /** - * Get stream objects in range [startOffset, endOffset) with limit. - * It will throw IllegalArgumentException if limit or streamId is invalid. - * @param streamId stream id + * Get stream objects in range [startOffset, endOffset) with limit. It will throw IllegalArgumentException if limit or streamId is invalid. + * + * @param streamId stream id * @param startOffset inclusive start offset of the stream - * @param endOffset exclusive end offset of the stream - * @param limit max number of stream objects to return + * @param endOffset exclusive end offset of the stream + * @param limit max number of stream objects to return * @return stream objects */ public List getStreamObjects(long streamId, long startOffset, long endOffset, int limit) { @@ -164,12 +163,12 @@ public RangeSearcher(long startOffset, long endOffset, long streamId, int broker this.brokerId = brokerId; } - private Queue rangeOfWalObjects() { + private Queue rangeOfWalObjects() { BrokerS3WALMetadataImage wal = brokerWALMetadata.get(brokerId); return wal.getWalObjects().list().stream() - .filter(obj -> obj.streamsIndex().containsKey(streamId) && obj.streamsIndex().get(streamId).size() != 0) + .filter(obj -> obj.offsetRanges().containsKey(streamId) && obj.offsetRanges().get(streamId).size() != 0) .flatMap(obj -> { - List indexes = obj.streamsIndex().get(streamId); + List indexes = obj.offsetRanges().get(streamId); // TODO: pre filter useless objects return indexes.stream().filter(index -> { long objectStartOffset = index.getStartOffset(); @@ -178,12 +177,17 @@ private Queue rangeOfWalObjects() { }).map(index -> { long startOffset = index.getStartOffset(); long endOffset = index.getEndOffset(); - return new ObjectStreamRange(obj.objectId(), obj.objectType(), startOffset, endOffset); + List offsetRanges = obj.offsetRanges().values().stream().flatMap(List::stream).sorted() + .collect(Collectors.toList()); + S3ObjectMetadata s3ObjectMetadata = new S3ObjectMetadata( + obj.objectId(), obj.objectType(), offsetRanges, obj.dataTimeInMs(), + obj.orderId()); + return new S3ObjectMetadataWrapper(s3ObjectMetadata, startOffset, endOffset); }); }).collect(Collectors.toCollection(LinkedList::new)); } - private Queue rangeOfStreamObjects() { + private Queue rangeOfStreamObjects() { S3StreamMetadataImage stream = streamsMetadata.get(streamId); Map streamObjectsMetadata = stream.getStreamObjects(); // TODO: refactor to make stream objects in order @@ -192,10 +196,12 @@ private Queue rangeOfStreamObjects() { long objectStartOffset = obj.streamOffsetRange().getStartOffset(); long objectEndOffset = obj.streamOffsetRange().getEndOffset(); return objectStartOffset < endOffset && objectEndOffset > startOffset; - }).sorted(Comparator.comparingLong(S3StreamObject::objectId)).map(obj -> { + }).sorted(Comparator.comparing(S3StreamObject::streamOffsetRange)).map(obj -> { long startOffset = obj.streamOffsetRange().getStartOffset(); long endOffset = obj.streamOffsetRange().getEndOffset(); - return new ObjectStreamRange(obj.objectId(), obj.objectType(), startOffset, endOffset); + S3ObjectMetadata s3ObjectMetadata = new S3ObjectMetadata( + obj.objectId(), obj.objectType(), List.of(obj.streamOffsetRange()), obj.dataTimeInMs()); + return new S3ObjectMetadataWrapper(s3ObjectMetadata, startOffset, endOffset); }).collect(Collectors.toCollection(LinkedList::new)); } return new LinkedList<>(); @@ -209,15 +215,15 @@ public InRangeObjects getObjects(int limit) { return InRangeObjects.INVALID; } - Queue streamObjects = rangeOfStreamObjects(); - Queue walObjects = rangeOfWalObjects(); + Queue streamObjects = rangeOfStreamObjects(); + Queue walObjects = rangeOfWalObjects(); List inRangeObjects = new ArrayList<>(); long nextStartOffset = startOffset; while (limit > 0 && nextStartOffset < endOffset && (!streamObjects.isEmpty() || !walObjects.isEmpty())) { - ObjectStreamRange streamRange = null; + S3ObjectMetadataWrapper streamRange = null; if (walObjects.isEmpty() || (!streamObjects.isEmpty() && streamObjects.peek().startOffset() < walObjects.peek().startOffset())) { streamRange = streamObjects.poll(); } else { @@ -231,7 +237,7 @@ public InRangeObjects getObjects(int limit) { if (objectEndOffset <= nextStartOffset) { continue; } - inRangeObjects.add(streamRange.toS3ObjectMetadata()); + inRangeObjects.add(streamRange.metadata); limit--; nextStartOffset = objectEndOffset; } @@ -240,22 +246,20 @@ public InRangeObjects getObjects(int limit) { } - static class ObjectStreamRange { + static class S3ObjectMetadataWrapper { - private final long objectId; - private final S3ObjectType objectType; + private final S3ObjectMetadata metadata; private final long startOffset; private final long endOffset; - public ObjectStreamRange(long objectId, S3ObjectType objectType, long startOffset, long endOffset) { - this.objectId = objectId; - this.objectType = objectType; + public S3ObjectMetadataWrapper(S3ObjectMetadata metadata, long startOffset, long endOffset) { + this.metadata = metadata; this.startOffset = startOffset; this.endOffset = endOffset; } - public long objectId() { - return objectId; + public S3ObjectMetadata metadata() { + return metadata; } public long startOffset() { @@ -265,10 +269,6 @@ public long startOffset() { public long endOffset() { return endOffset; } - - public S3ObjectMetadata toS3ObjectMetadata() { - return new S3ObjectMetadata(objectId, -1, objectType); - } } @Override diff --git a/metadata/src/main/java/org/apache/kafka/metadata/stream/S3ObjectMetadata.java b/metadata/src/main/java/org/apache/kafka/metadata/stream/S3ObjectMetadata.java index 0fe45c173a..d4d9122cae 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/stream/S3ObjectMetadata.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/stream/S3ObjectMetadata.java @@ -18,28 +18,88 @@ package org.apache.kafka.metadata.stream; +import java.util.Collections; +import java.util.List; import java.util.Objects; public class S3ObjectMetadata { + private final long objectId; + + /** + * order id of the object. + *
    + *
  • WAL object: order id of the wal object. + *
  • STREAM object: meaningless. + *
+ */ + private final long orderId; private long objectSize; private final S3ObjectType type; + /** + * stream offset ranges of the object. + *
    + *
  • WAL object: one or more stream offset ranges. + *
  • STREAM object: only one stream offset range. + *
+ */ + private final List offsetRanges; + /** + * real committed timestamp of the data in the object. + */ + private long committedTimestamp; + /** + * logical timestamp in ms of the data in the object. + */ + private final long dataTimeInMs; + + // Only used for testing public S3ObjectMetadata(long objectId, long objectSize, S3ObjectType type) { + this(objectId, type, Collections.emptyList(), S3StreamConstant.INVALID_TS, S3StreamConstant.INVALID_TS, objectSize, + S3StreamConstant.INVALID_ORDER_ID); + } + + public S3ObjectMetadata(long objectId, S3ObjectType type, List offsetRanges, long dataTimeInMs) { + this(objectId, type, offsetRanges, dataTimeInMs, S3StreamConstant.INVALID_TS, S3StreamConstant.INVALID_OBJECT_SIZE, + S3StreamConstant.INVALID_ORDER_ID); + } + + public S3ObjectMetadata(long objectId, S3ObjectType type, List offsetRanges, long dataTimeInMs, + long orderId) { + this(objectId, type, offsetRanges, dataTimeInMs, S3StreamConstant.INVALID_TS, S3StreamConstant.INVALID_OBJECT_SIZE, + orderId); + } + + public S3ObjectMetadata( + // these four params come from S3WALObject or S3StreamObject + long objectId, S3ObjectType type, List offsetRanges, long dataTimeInMs, + // these two params come from S3Object + long committedTimestamp, long objectSize, + // this param only comes from S3WALObject + long orderId) { this.objectId = objectId; + this.orderId = orderId; this.objectSize = objectSize; this.type = type; + this.offsetRanges = offsetRanges; + this.dataTimeInMs = dataTimeInMs; + this.committedTimestamp = committedTimestamp; } public void setObjectSize(long objectSize) { this.objectSize = objectSize; } - public long getObjectId() { + public void setCommittedTimestamp(long committedTimestamp) { + this.committedTimestamp = committedTimestamp; + } + + public long objectId() { return objectId; } - public long getObjectSize() { + public long objectSize() { return objectSize; } @@ -47,6 +107,41 @@ public S3ObjectType getType() { return type; } + public long getOrderId() { + return orderId; + } + + public long committedTimestamp() { + return committedTimestamp; + } + + public long dataTimeInMs() { + return dataTimeInMs; + } + + public List getOffsetRanges() { + return offsetRanges; + } + + public long startOffset() { + if (offsetRanges == null || offsetRanges.isEmpty()) { + return S3StreamConstant.INVALID_OFFSET; + } + return offsetRanges.get(0).getStartOffset(); + } + + public long endOffset() { + if (offsetRanges == null || offsetRanges.isEmpty()) { + return S3StreamConstant.INVALID_OFFSET; + } + return offsetRanges.get(offsetRanges.size() - 1).getEndOffset(); + } + + public String toString() { + return "S3ObjectMetadata(objectId=" + objectId + ", objectSize=" + objectSize + ", type=" + type + ", offsetRanges=" + offsetRanges + + ", committedTimestamp=" + committedTimestamp + ", dataTimestamp=" + dataTimeInMs + ")"; + } + public String key() { return ObjectUtils.genKey(0, "todocluster", objectId); } @@ -60,11 +155,12 @@ public boolean equals(Object o) { return false; } S3ObjectMetadata that = (S3ObjectMetadata) o; - return objectId == that.objectId && objectSize == that.objectSize && type == that.type; + return objectId == that.objectId && orderId == that.orderId && objectSize == that.objectSize && committedTimestamp == that.committedTimestamp + && dataTimeInMs == that.dataTimeInMs && type == that.type && offsetRanges.equals(that.offsetRanges); } @Override public int hashCode() { - return Objects.hash(objectId, objectSize, type); + return Objects.hash(objectId, orderId, objectSize, type, offsetRanges, committedTimestamp, dataTimeInMs); } } diff --git a/metadata/src/main/java/org/apache/kafka/metadata/stream/S3StreamConstant.java b/metadata/src/main/java/org/apache/kafka/metadata/stream/S3StreamConstant.java index 1c9e9103c1..d5fa0f179c 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/stream/S3StreamConstant.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/stream/S3StreamConstant.java @@ -37,4 +37,10 @@ public class S3StreamConstant { public static final long MAX_OBJECT_ID = Long.MAX_VALUE; + public static final long INVALID_ORDER_ID = -1L; + + public static final long INVALID_TS = -1L; + + public static final long INVALID_OBJECT_SIZE = -1L; + } diff --git a/metadata/src/main/java/org/apache/kafka/metadata/stream/S3StreamObject.java b/metadata/src/main/java/org/apache/kafka/metadata/stream/S3StreamObject.java index 944ed903ae..07e3c21941 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/stream/S3StreamObject.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/stream/S3StreamObject.java @@ -24,15 +24,13 @@ public class S3StreamObject { private final long objectId; - - private final long objectSize; - + private final long dataTimeInMs; private final StreamOffsetRange streamOffsetRange; - public S3StreamObject(long objectId, long objectSize, long streamId, long startOffset, long endOffset) { + public S3StreamObject(long objectId, long streamId, long startOffset, long endOffset, long dataTimeInMs) { this.objectId = objectId; - this.objectSize = objectSize; this.streamOffsetRange = new StreamOffsetRange(streamId, startOffset, endOffset); + this.dataTimeInMs = dataTimeInMs; } public StreamOffsetRange streamOffsetRange() { @@ -43,26 +41,27 @@ public long objectId() { return objectId; } - public long objectSize() { - return objectSize; - } - public S3ObjectType objectType() { return S3ObjectType.STREAM; } + public long dataTimeInMs() { + return dataTimeInMs; + } + public ApiMessageAndVersion toRecord() { return new ApiMessageAndVersion(new S3StreamObjectRecord() .setObjectId(objectId) .setStreamId(streamOffsetRange.getStreamId()) .setStartOffset(streamOffsetRange.getStartOffset()) - .setEndOffset(streamOffsetRange.getEndOffset()), (short) 0); + .setEndOffset(streamOffsetRange.getEndOffset()) + .setDataTimeInMs(dataTimeInMs), (short) 0); } public static S3StreamObject of(S3StreamObjectRecord record) { S3StreamObject s3StreamObject = new S3StreamObject( - record.objectId(), record.objectSize(), record.streamId(), - record.startOffset(), record.endOffset()); + record.objectId(), record.streamId(), + record.startOffset(), record.endOffset(), record.dataTimeInMs()); return s3StreamObject; } diff --git a/metadata/src/main/java/org/apache/kafka/metadata/stream/S3StreamObjectMetadata.java b/metadata/src/main/java/org/apache/kafka/metadata/stream/S3StreamObjectMetadata.java deleted file mode 100644 index e3f56cc302..0000000000 --- a/metadata/src/main/java/org/apache/kafka/metadata/stream/S3StreamObjectMetadata.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.metadata.stream; - -import java.util.Objects; - -public class S3StreamObjectMetadata implements Comparable { - private final S3StreamObject s3StreamObject; - private final long timestamp; - - public S3StreamObjectMetadata(S3StreamObject s3StreamObject, long timestamp) { - this.s3StreamObject = s3StreamObject; - this.timestamp = timestamp; - } - - public long startOffset() { - return s3StreamObject.streamOffsetRange().getStartOffset(); - } - - public long endOffset() { - return s3StreamObject.streamOffsetRange().getEndOffset(); - } - - public long streamId() { - return s3StreamObject.streamOffsetRange().getStreamId(); - } - - public long objectId() { - return s3StreamObject.objectId(); - } - - public long objectSize() { - return s3StreamObject.objectSize(); - } - - public long timestamp() { - return timestamp; - } - - @Override - public int compareTo(S3StreamObjectMetadata o) { - return s3StreamObject.streamOffsetRange().compareTo(o.s3StreamObject.streamOffsetRange()); - } - - @Override - public int hashCode() { - return Objects.hash(s3StreamObject, timestamp); - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (!(o instanceof S3StreamObjectMetadata)) return false; - S3StreamObjectMetadata that = (S3StreamObjectMetadata) o; - return s3StreamObject.equals(that.s3StreamObject) && timestamp == that.timestamp; - } -} diff --git a/metadata/src/main/java/org/apache/kafka/metadata/stream/S3WALObject.java b/metadata/src/main/java/org/apache/kafka/metadata/stream/S3WALObject.java index 69af4385ab..08ce888c2c 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/stream/S3WALObject.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/stream/S3WALObject.java @@ -26,30 +26,34 @@ public class S3WALObject implements Comparable { + private final long objectId; + private final int brokerId; + private final Map> streamOffsetRanges; + /** - * The order id of the object. - * Sort by this field to get the order of the objects which contains logically increasing streams. + * The order id of the object. Sort by this field to get the order of the objects which contains logically increasing streams. *

- * When compact a batch of objects to a compacted object, - * this compacted object's order id will be assigned the value first object's order id in this batch + * When compact a batch of objects to a compacted object, this compacted object's order id will be assigned the value first object's order + * id in this batch */ private final long orderId; - private final long objectId; + private final long dataTimeInMs; - private final int brokerId; - private final Map> streamsIndex; - - private final S3ObjectType objectType = S3ObjectType.UNKNOWN; + // Only used for testing + public S3WALObject(long objectId, int brokerId, final Map> streamOffsetRanges, long orderId) { + this(objectId, brokerId, streamOffsetRanges, orderId, S3StreamConstant.INVALID_TS); + } - public S3WALObject(long objectId, int brokerId, final Map> streamsIndex, long orderId) { + public S3WALObject(long objectId, int brokerId, final Map> streamOffsetRanges, long orderId, long dataTimeInMs) { this.orderId = orderId; this.objectId = objectId; this.brokerId = brokerId; - this.streamsIndex = streamsIndex; + this.streamOffsetRanges = streamOffsetRanges; + this.dataTimeInMs = dataTimeInMs; } public boolean intersect(long streamId, long startOffset, long endOffset) { - List indexes = streamsIndex.get(streamId); + List indexes = streamOffsetRanges.get(streamId); if (indexes == null || indexes.isEmpty()) { return false; } @@ -58,8 +62,8 @@ public boolean intersect(long streamId, long startOffset, long endOffset) { return startOffset >= firstIndex.getStartOffset() && startOffset <= lastIndex.getEndOffset(); } - public Map> streamsIndex() { - return streamsIndex; + public Map> offsetRanges() { + return streamOffsetRanges; } public ApiMessageAndVersion toRecord() { @@ -67,8 +71,9 @@ public ApiMessageAndVersion toRecord() { .setObjectId(objectId) .setBrokerId(brokerId) .setOrderId(orderId) + .setDataTimeInMs(dataTimeInMs) .setStreamsIndex( - streamsIndex.values().stream().flatMap(List::stream) + streamOffsetRanges.values().stream().flatMap(List::stream) .map(StreamOffsetRange::toRecordStreamIndex) .collect(Collectors.toList())), (short) 0); } @@ -78,7 +83,7 @@ public static S3WALObject of(WALObjectRecord record) { .map(index -> new StreamOffsetRange(index.streamId(), index.startOffset(), index.endOffset())) .collect(Collectors.groupingBy(StreamOffsetRange::getStreamId)); S3WALObject s3WalObject = new S3WALObject(record.objectId(), record.brokerId(), - collect, record.orderId()); + collect, record.orderId(), record.dataTimeInMs()); return s3WalObject; } @@ -91,13 +96,17 @@ public Long objectId() { } public S3ObjectType objectType() { - return objectType; + return S3ObjectType.WAL; } public long orderId() { return orderId; } + public long dataTimeInMs() { + return dataTimeInMs; + } + @Override public boolean equals(Object o) { if (this == o) { @@ -121,8 +130,8 @@ public String toString() { "objectId=" + objectId + ", orderId=" + orderId + ", brokerId=" + brokerId + - ", streamsIndex=" + streamsIndex + - ", objectType=" + objectType + + ", streamOffsetRanges=" + streamOffsetRanges + + ", dataTimeInMs=" + dataTimeInMs + '}'; } diff --git a/metadata/src/main/java/org/apache/kafka/metadata/stream/StreamOffsetRange.java b/metadata/src/main/java/org/apache/kafka/metadata/stream/StreamOffsetRange.java index 581659a79b..332f4cc3c9 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/stream/StreamOffsetRange.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/stream/StreamOffsetRange.java @@ -17,6 +17,7 @@ package org.apache.kafka.metadata.stream; +import java.util.Objects; import org.apache.kafka.common.metadata.WALObjectRecord.StreamIndex; /** @@ -60,6 +61,23 @@ public int compareTo(StreamOffsetRange o) { return res == 0 ? Long.compare(this.endOffset, o.endOffset) : res; } + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + StreamOffsetRange that = (StreamOffsetRange) o; + return streamId == that.streamId && startOffset == that.startOffset && endOffset == that.endOffset; + } + + @Override + public int hashCode() { + return Objects.hash(streamId, startOffset, endOffset); + } + public StreamIndex toRecordStreamIndex() { return new StreamIndex() .setStreamId(streamId) diff --git a/metadata/src/main/resources/common/metadata/S3StreamObjectRecord.json b/metadata/src/main/resources/common/metadata/S3StreamObjectRecord.json index ee3ed37f14..6f89484759 100644 --- a/metadata/src/main/resources/common/metadata/S3StreamObjectRecord.json +++ b/metadata/src/main/resources/common/metadata/S3StreamObjectRecord.json @@ -45,10 +45,10 @@ "about": "The object id of the S3 object" }, { - "name": "ObjectSize", + "name": "DataTimeInMs", "type": "int64", "versions": "0+", - "about": "The size of the Stream object to commit" + "about": "The data time of the S3 object" } ] } \ No newline at end of file diff --git a/metadata/src/main/resources/common/metadata/WALObjectRecord.json b/metadata/src/main/resources/common/metadata/WALObjectRecord.json index df4fd0bdd4..1d676ede1f 100644 --- a/metadata/src/main/resources/common/metadata/WALObjectRecord.json +++ b/metadata/src/main/resources/common/metadata/WALObjectRecord.json @@ -38,6 +38,12 @@ "versions": "0+", "about": "The order id of the S3 object" }, + { + "name": "DataTimeInMs", + "type": "int64", + "versions": "0+", + "about": "The data time of the S3 object" + }, { "name": "StreamsIndex", "type": "[]StreamIndex", diff --git a/metadata/src/test/java/org/apache/kafka/controller/S3ObjectControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/S3ObjectControlManagerTest.java index b191ae3e61..1aff94151a 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/S3ObjectControlManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/S3ObjectControlManagerTest.java @@ -157,19 +157,20 @@ public void testCommitObject() { prepareOneObject(60 * 1000); // 2. commit an object which not exist in controller - ControllerResult result1 = manager.commitObject(1, 1024); + long expectedCommittedTs = 1313L; + ControllerResult result1 = manager.commitObject(1, 1024, expectedCommittedTs); assertEquals(Errors.OBJECT_NOT_EXIST, result1.response()); assertEquals(0, result1.records().size()); // 3. commit an valid object - ControllerResult result2 = manager.commitObject(0, 1024); + ControllerResult result2 = manager.commitObject(0, 1024, expectedCommittedTs); assertEquals(Errors.NONE, result2.response()); assertEquals(1, result2.records().size()); S3ObjectRecord record = (S3ObjectRecord) result2.records().get(0).message(); manager.replay(record); // 4. commit again - ControllerResult result3 = manager.commitObject(0, 1024); + ControllerResult result3 = manager.commitObject(0, 1024, expectedCommittedTs); assertEquals(Errors.REDUNDANT_OPERATION, result3.response()); assertEquals(0, result3.records().size()); @@ -177,6 +178,9 @@ public void testCommitObject() { assertEquals(1, manager.objectsMetadata().size()); S3Object object = manager.objectsMetadata().get(0L); assertEquals(S3ObjectState.COMMITTED, object.getS3ObjectState()); + assertEquals(0L, object.getObjectId()); + assertEquals(1024, object.getObjectSize()); + assertEquals(expectedCommittedTs, object.getCommittedTimeInMs()); } @Test diff --git a/metadata/src/test/java/org/apache/kafka/controller/StreamControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/StreamControlManagerTest.java index a8430f41f9..8f6302e6ab 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/StreamControlManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/StreamControlManagerTest.java @@ -270,7 +270,7 @@ public void testBasicOpenCloseStream() { @Test public void testCommitWalBasic() { - Mockito.when(objectControlManager.commitObject(anyLong(), anyLong())).then(ink -> { + Mockito.when(objectControlManager.commitObject(anyLong(), anyLong(), anyLong())).then(ink -> { long objectId = ink.getArgument(0); if (objectId == 1) { return ControllerResult.of(Collections.emptyList(), Errors.OBJECT_NOT_EXIST); @@ -380,7 +380,7 @@ private void createAndOpenStream(int brokerId, long epoch) { @Test public void testCommitWalCompacted() { - Mockito.when(objectControlManager.commitObject(anyLong(), anyLong())).thenReturn(ControllerResult.of(Collections.emptyList(), Errors.NONE)); + Mockito.when(objectControlManager.commitObject(anyLong(), anyLong(), anyLong())).thenReturn(ControllerResult.of(Collections.emptyList(), Errors.NONE)); Mockito.when(objectControlManager.markDestroyObjects(anyList())).thenReturn(ControllerResult.of(Collections.emptyList(), true)); // 1. create and open stream_0 and stream_1 @@ -419,6 +419,7 @@ public void testCommitWalCompacted() { assertEquals(STREAM1, streamsOffset.streamsOffset().get(1).streamId()); assertEquals(0L, streamsOffset.streamsOffset().get(1).startOffset()); assertEquals(200L, streamsOffset.streamsOffset().get(1).endOffset()); + long object0DataTs = manager.brokersMetadata().get(BROKER0).walObjects().get(0L).dataTimeInMs(); // 4. keep committing first level object of stream_0 and stream_1 List streamRanges1 = List.of( @@ -451,6 +452,7 @@ public void testCommitWalCompacted() { assertEquals(STREAM1, streamsOffset.streamsOffset().get(1).streamId()); assertEquals(0L, streamsOffset.streamsOffset().get(1).startOffset()); assertEquals(300L, streamsOffset.streamsOffset().get(1).endOffset()); + long object1DataTs = manager.brokersMetadata().get(BROKER0).walObjects().get(1L).dataTimeInMs(); // 6. commit an invalid wal object which contains the destroyed or not exist wal object Mockito.when(objectControlManager.markDestroyObjects(anyList())).thenReturn(ControllerResult.of(Collections.emptyList(), false)); @@ -498,6 +500,7 @@ public void testCommitWalCompacted() { assertEquals(STREAM1, streamsOffset.streamsOffset().get(1).streamId()); assertEquals(0L, streamsOffset.streamsOffset().get(1).startOffset()); assertEquals(300L, streamsOffset.streamsOffset().get(1).endOffset()); + assertEquals(object0DataTs, manager.brokersMetadata().get(BROKER0).walObjects().get(2L).dataTimeInMs()); // 9. verify compacted wal objects is removed assertEquals(1, manager.brokersMetadata().get(BROKER0).walObjects().size()); @@ -508,7 +511,7 @@ public void testCommitWalCompacted() { @Test public void testCommitWalWithStreamObject() { - Mockito.when(objectControlManager.commitObject(anyLong(), anyLong())).thenReturn(ControllerResult.of(Collections.emptyList(), Errors.NONE)); + Mockito.when(objectControlManager.commitObject(anyLong(), anyLong(), anyLong())).thenReturn(ControllerResult.of(Collections.emptyList(), Errors.NONE)); Mockito.when(objectControlManager.markDestroyObjects(anyList())).thenReturn(ControllerResult.of(Collections.emptyList(), true)); // 1. create and open stream_0 and stream_1 @@ -557,7 +560,7 @@ public void testCommitWalWithStreamObject() { @Test public void testCommitStreamObject() { - Mockito.when(objectControlManager.commitObject(anyLong(), anyLong())).thenReturn(ControllerResult.of(Collections.emptyList(), Errors.NONE)); + Mockito.when(objectControlManager.commitObject(anyLong(), anyLong(), anyLong())).thenReturn(ControllerResult.of(Collections.emptyList(), Errors.NONE)); Mockito.when(objectControlManager.markDestroyObjects(anyList())).thenReturn(ControllerResult.of(Collections.emptyList(), true)); // 1. create and open stream_0 and stream_1 @@ -588,6 +591,7 @@ public void testCommitStreamObject() { ControllerResult result0 = manager.commitWALObject(commitRequest0); assertEquals(Errors.NONE.code(), result0.response().errorCode()); replay(manager, result0.records()); + long object0DataTs = manager.streamsMetadata().get(STREAM1).streamObjects().get(1L).dataTimeInMs(); // 3. commit a wal with stream_0 and a stream object with stream_1 that is split out from wal List streamRanges1 = List.of( @@ -613,6 +617,7 @@ public void testCommitStreamObject() { ControllerResult result1 = manager.commitWALObject(commitRequest1); assertEquals(Errors.NONE.code(), result1.response().errorCode()); replay(manager, result1.records()); + long object1DataTs = manager.streamsMetadata().get(STREAM1).streamObjects().get(3L).dataTimeInMs(); // 4. compact these two stream objects CommitStreamObjectRequestData streamObjectRequest = new CommitStreamObjectRequestData() @@ -650,8 +655,9 @@ public void testCommitStreamObject() { assertEquals(Errors.STREAM_INNER_ERROR.code(), result2.response().errorCode()); replay(manager, result2.records()); - // 6. verify stream objects + // 7. verify stream objects assertEquals(1, manager.streamsMetadata().get(STREAM1).streamObjects().size()); + assertEquals(object0DataTs, manager.streamsMetadata().get(STREAM1).streamObjects().get(4L).dataTimeInMs()); assertEquals(4L, manager.streamsMetadata().get(STREAM1).streamObjects().get(4L).objectId()); assertEquals(0L, manager.streamsMetadata().get(STREAM1).streamObjects().get(4L).streamOffsetRange().getStartOffset()); assertEquals(400L, manager.streamsMetadata().get(STREAM1).streamObjects().get(4L).streamOffsetRange().getEndOffset()); diff --git a/metadata/src/test/java/org/apache/kafka/image/S3StreamsMetadataImageTest.java b/metadata/src/test/java/org/apache/kafka/image/S3StreamsMetadataImageTest.java index ecbb1fec26..96bf543dfb 100644 --- a/metadata/src/test/java/org/apache/kafka/image/S3StreamsMetadataImageTest.java +++ b/metadata/src/test/java/org/apache/kafka/image/S3StreamsMetadataImageTest.java @@ -30,6 +30,7 @@ import org.apache.kafka.metadata.stream.InRangeObjects; import org.apache.kafka.metadata.stream.RangeMetadata; import org.apache.kafka.metadata.stream.S3ObjectMetadata; +import org.apache.kafka.metadata.stream.S3StreamConstant; import org.apache.kafka.metadata.stream.StreamOffsetRange; import org.apache.kafka.metadata.stream.S3StreamObject; import org.apache.kafka.metadata.stream.S3WALObject; @@ -119,9 +120,9 @@ public void testGetObjects() { 3, new RangeMetadata(STREAM0, 3L, 3, 420L, 500L, BROKER1), 4, new RangeMetadata(STREAM0, 4L, 4, 500L, 600L, BROKER0)); Map streamObjects = Map.of( - 8L, new S3StreamObject(8, GB, STREAM0, 10L, 100L), - 9L, new S3StreamObject(9, GB, STREAM0, 200L, 300L), - 10L, new S3StreamObject(10, GB, STREAM0, 300L, 400L)); + 8L, new S3StreamObject(8, STREAM0, 10L, 100L, S3StreamConstant.INVALID_TS), + 9L, new S3StreamObject(9, STREAM0, 200L, 300L, S3StreamConstant.INVALID_TS), + 10L, new S3StreamObject(10, STREAM0, 300L, 400L, S3StreamConstant.INVALID_TS)); S3StreamMetadataImage streamImage = new S3StreamMetadataImage(STREAM0, 4L, StreamState.OPENED, 4, 10, ranges, streamObjects); S3StreamsMetadataImage streamsImage = new S3StreamsMetadataImage(STREAM0, Map.of(STREAM0, streamImage), Map.of(BROKER0, broker0WALMetadataImage, BROKER1, broker1WALMetadataImage)); @@ -142,41 +143,41 @@ public void testGetObjects() { assertEquals(12, objects.objects().size()); List expectedObjectIds = List.of( 8L, 0L, 1L, 5L, 6L, 2L, 9L, 10L, 3L, 7L, 3L, 4L); - assertEquals(expectedObjectIds, objects.objects().stream().map(S3ObjectMetadata::getObjectId).collect(Collectors.toList())); + assertEquals(expectedObjectIds, objects.objects().stream().map(S3ObjectMetadata::objectId).collect(Collectors.toList())); // 4. search stream_0 in [20, 550) objects = streamsImage.getObjects(STREAM0, 20, 550, Integer.MAX_VALUE); assertEquals(20, objects.startOffset()); assertEquals(600, objects.endOffset()); assertEquals(12, objects.objects().size()); - assertEquals(expectedObjectIds, objects.objects().stream().map(S3ObjectMetadata::getObjectId).collect(Collectors.toList())); + assertEquals(expectedObjectIds, objects.objects().stream().map(S3ObjectMetadata::objectId).collect(Collectors.toList())); // 5. search stream_0 in [20, 550) with limit 5 objects = streamsImage.getObjects(STREAM0, 20, 550, 5); assertEquals(20, objects.startOffset()); assertEquals(180, objects.endOffset()); assertEquals(5, objects.objects().size()); - assertEquals(expectedObjectIds.subList(0, 5), objects.objects().stream().map(S3ObjectMetadata::getObjectId).collect(Collectors.toList())); + assertEquals(expectedObjectIds.subList(0, 5), objects.objects().stream().map(S3ObjectMetadata::objectId).collect(Collectors.toList())); // 6. search stream_0 in [400, 520) objects = streamsImage.getObjects(STREAM0, 400, 520, Integer.MAX_VALUE); assertEquals(400, objects.startOffset()); assertEquals(520, objects.endOffset()); assertEquals(3, objects.objects().size()); - assertEquals(expectedObjectIds.subList(8, 11), objects.objects().stream().map(S3ObjectMetadata::getObjectId).collect(Collectors.toList())); + assertEquals(expectedObjectIds.subList(8, 11), objects.objects().stream().map(S3ObjectMetadata::objectId).collect(Collectors.toList())); // 7. search stream_0 in [401, 519) objects = streamsImage.getObjects(STREAM0, 401, 519, Integer.MAX_VALUE); assertEquals(401, objects.startOffset()); assertEquals(520, objects.endOffset()); assertEquals(3, objects.objects().size()); - assertEquals(expectedObjectIds.subList(8, 11), objects.objects().stream().map(S3ObjectMetadata::getObjectId).collect(Collectors.toList())); + assertEquals(expectedObjectIds.subList(8, 11), objects.objects().stream().map(S3ObjectMetadata::objectId).collect(Collectors.toList())); // 8. search stream_0 in [399, 521) objects = streamsImage.getObjects(STREAM0, 399, 521, Integer.MAX_VALUE); assertEquals(399, objects.startOffset()); assertEquals(600, objects.endOffset()); assertEquals(5, objects.objects().size()); - assertEquals(expectedObjectIds.subList(7, 12), objects.objects().stream().map(S3ObjectMetadata::getObjectId).collect(Collectors.toList())); + assertEquals(expectedObjectIds.subList(7, 12), objects.objects().stream().map(S3ObjectMetadata::objectId).collect(Collectors.toList())); } }