diff --git a/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java index 18820a554b..93d3648c6a 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java @@ -286,26 +286,10 @@ public ControllerResult commitWALObject(CommitWALOb return ControllerResult.of(Collections.emptyList(), resp); } records.addAll(commitResult.records()); - List indexes = new ArrayList<>(streamRanges.size()); - streamRanges.stream().filter(range -> !failedStreamIds.contains(range.streamId())).forEach(range -> { - // build WAL object - long streamId = range.streamId(); - long startOffset = range.startOffset(); - long endOffset = range.endOffset(); - indexes.add(new S3ObjectStreamIndex(objectId, startOffset, endOffset)); - // TODO: support lazy flush range's end offset - // update range's offset - S3StreamMetadata streamMetadata = this.streamsMetadata.get(streamId); - RangeMetadata oldRange = streamMetadata.ranges.get(streamMetadata.currentRangeIndex()); - RangeRecord record = new RangeRecord() - .setStreamId(streamId) - .setBrokerId(brokerId) - .setEpoch(oldRange.epoch()) - .setRangeIndex(oldRange.rangeIndex()) - .setStartOffset(oldRange.startOffset()) - .setEndOffset(endOffset); - records.add(new ApiMessageAndVersion(record, (short) 0)); - }); + List indexes = streamRanges.stream() + .filter(range -> !failedStreamIds.contains(range.streamId())) + .map(range -> new S3ObjectStreamIndex(range.streamId(), range.startOffset(), range.endOffset())) + .collect(Collectors.toList()); // update broker's wal object BrokerS3WALMetadata brokerMetadata = this.brokersMetadata.get(brokerId); if (brokerMetadata == null) { @@ -394,11 +378,33 @@ public void replay(WALObjectRecord record) { log.error("broker {} not exist when replay wal object record {}", brokerId, record); return; } + + // create wal object Map> indexMap = streamIndexes .stream() .map(S3ObjectStreamIndex::of) .collect(Collectors.groupingBy(S3ObjectStreamIndex::getStreamId)); brokerMetadata.walObjects.add(new S3WALObject(objectId, brokerId, indexMap)); + + // update range + record.streamsIndex().forEach(index -> { + long streamId = index.streamId(); + S3StreamMetadata metadata = this.streamsMetadata.get(streamId); + if (metadata == null) { + // ignore it + return; + } + RangeMetadata rangeMetadata = metadata.ranges().get(metadata.currentRangeIndex.get()); + if (rangeMetadata == null) { + // ignore it + return; + } + if (rangeMetadata.endOffset() != index.startOffset()) { + // ignore it + return; + } + rangeMetadata.setEndOffset(index.endOffset()); + }); } diff --git a/metadata/src/main/java/org/apache/kafka/image/S3StreamMetadataDelta.java b/metadata/src/main/java/org/apache/kafka/image/S3StreamMetadataDelta.java index c823ba3f6d..1527b8c3df 100644 --- a/metadata/src/main/java/org/apache/kafka/image/S3StreamMetadataDelta.java +++ b/metadata/src/main/java/org/apache/kafka/image/S3StreamMetadataDelta.java @@ -17,10 +17,12 @@ package org.apache.kafka.image; +import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; +import org.apache.kafka.common.metadata.AdvanceRangeRecord; import org.apache.kafka.common.metadata.RangeRecord; import org.apache.kafka.common.metadata.RemoveRangeRecord; import org.apache.kafka.common.metadata.RemoveS3StreamObjectRecord; @@ -36,6 +38,8 @@ public class S3StreamMetadataDelta { private long newStartOffset; private long newEpoch; + private int activeRangeIndex = -1; + private final Map changedRanges = new HashMap<>(); private final Set removedRanges = new HashSet<>(); private final Map changedS3StreamObjects = new HashMap<>(); @@ -46,6 +50,7 @@ public S3StreamMetadataDelta(S3StreamMetadataImage image) { this.newEpoch = image.getEpoch(); this.streamId = image.getStreamId(); this.newStartOffset = image.getStartOffset(); + this.activeRangeIndex = image.getRanges().keySet().stream().sorted(Comparator.reverseOrder()).findFirst().orElse(-1); } public void replay(S3StreamRecord record) { this.streamId = record.streamId(); @@ -57,12 +62,16 @@ public void replay(RangeRecord record) { changedRanges.put(record.rangeIndex(), RangeMetadata.of(record)); // new add or update, so remove from removedRanges removedRanges.remove(record.rangeIndex()); + this.activeRangeIndex = record.rangeIndex(); } public void replay(RemoveRangeRecord record) { removedRanges.add(record.rangeIndex()); // new remove, so remove from changedRanges changedRanges.remove(record.rangeIndex()); + if (record.rangeIndex() == activeRangeIndex) { + activeRangeIndex = -1; + } } public void replay(S3StreamObjectRecord record) { @@ -77,6 +86,28 @@ public void replay(RemoveS3StreamObjectRecord record) { changedS3StreamObjects.remove(record.objectId()); } + public void replay(AdvanceRangeRecord record) { + long startOffset = record.startOffset(); + long newEndOffset = record.endOffset(); + // check active range + RangeMetadata metadata = this.changedRanges.get(activeRangeIndex); + if (metadata == null) { + metadata = this.image.getRanges().get(activeRangeIndex); + } + if (metadata == null) { + // ignore it + return; + } + if (startOffset != metadata.endOffset()) { + // ignore it + return; + } + // update the endOffset + this.changedRanges.put(activeRangeIndex, new RangeMetadata( + streamId, metadata.epoch(), metadata.rangeIndex(), metadata.startOffset(), newEndOffset, metadata.brokerId() + )); + } + public S3StreamMetadataImage apply() { Map newRanges = new HashMap<>(image.getRanges()); // add all new changed ranges diff --git a/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataDelta.java b/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataDelta.java index bcf5a206c6..9b01d63e1c 100644 --- a/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataDelta.java +++ b/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataDelta.java @@ -21,6 +21,7 @@ import java.util.HashSet; import java.util.Map; import java.util.Set; +import org.apache.kafka.common.metadata.AdvanceRangeRecord; import org.apache.kafka.common.metadata.AssignedStreamIdRecord; import org.apache.kafka.common.metadata.BrokerWALMetadataRecord; import org.apache.kafka.common.metadata.RangeRecord; @@ -103,6 +104,11 @@ public void replay(RemoveS3StreamObjectRecord record) { public void replay(WALObjectRecord record) { getOrCreateBrokerStreamMetadataDelta(record.brokerId()).replay(record); + record.streamsIndex().forEach(index -> { + getOrCreateStreamMetadataDelta(index.streamId()).replay(new AdvanceRangeRecord() + .setStartOffset(index.startOffset()) + .setEndOffset(index.endOffset())); + }); } public void replay(RemoveWALObjectRecord record) { diff --git a/metadata/src/main/java/org/apache/kafka/metadata/stream/RangeMetadata.java b/metadata/src/main/java/org/apache/kafka/metadata/stream/RangeMetadata.java index dcfadedf6e..6fc0e7125b 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/stream/RangeMetadata.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/stream/RangeMetadata.java @@ -69,6 +69,10 @@ public int brokerId() { return brokerId; } + public void setEndOffset(long endOffset) { + this.endOffset = endOffset; + } + public ApiMessageAndVersion toRecord() { return new ApiMessageAndVersion(new RangeRecord() .setStreamId(streamId) diff --git a/metadata/src/main/java/org/apache/kafka/metadata/stream/SimplifiedS3Object.java b/metadata/src/main/java/org/apache/kafka/metadata/stream/SimplifiedS3Object.java deleted file mode 100644 index f367f8c7b2..0000000000 --- a/metadata/src/main/java/org/apache/kafka/metadata/stream/SimplifiedS3Object.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.metadata.stream; - -import org.apache.kafka.common.metadata.S3ObjectRecord; -import org.apache.kafka.server.common.ApiMessageAndVersion; - -/** - * Simplified S3 object metadata, only be used in metadata cache of broker. - */ -public class SimplifiedS3Object { - private final long objectId; - private final S3ObjectState state; - - public SimplifiedS3Object(final long objectId, final S3ObjectState state) { - this.objectId = objectId; - this.state = state; - } - - public long objectId() { - return objectId; - } - - public S3ObjectState state() { - return state; - } - - public ApiMessageAndVersion toRecord() { - return new ApiMessageAndVersion(new S3ObjectRecord(). - setObjectId(objectId). - setObjectState((byte) state.ordinal()), (short) 0); - } - - public static SimplifiedS3Object of(final S3ObjectRecord record) { - return new SimplifiedS3Object(record.objectId(), S3ObjectState.fromByte(record.objectState())); - } -} diff --git a/metadata/src/main/resources/common/metadata/AdvanceRangeRecord.json b/metadata/src/main/resources/common/metadata/AdvanceRangeRecord.json new file mode 100644 index 0000000000..f9e50cd59a --- /dev/null +++ b/metadata/src/main/resources/common/metadata/AdvanceRangeRecord.json @@ -0,0 +1,36 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 515, + "type": "metadata", + "name": "AdvanceRangeRecord", + "validVersions": "0", + "flexibleVersions": "0+", + "fields": [ + { + "name": "StartOffset", + "type": "int64", + "versions": "0+", + "about": "The start offset of the range" + }, + { + "name": "EndOffset", + "type": "int64", + "versions": "0+", + "about": "The end offset of the range" + } + ] +} \ No newline at end of file diff --git a/metadata/src/test/java/org/apache/kafka/image/S3StreamMetadataImageTest.java b/metadata/src/test/java/org/apache/kafka/image/S3StreamMetadataImageTest.java index faa9d0e7db..4c2ed0ab32 100644 --- a/metadata/src/test/java/org/apache/kafka/image/S3StreamMetadataImageTest.java +++ b/metadata/src/test/java/org/apache/kafka/image/S3StreamMetadataImageTest.java @@ -22,6 +22,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import org.apache.kafka.common.metadata.AdvanceRangeRecord; import org.apache.kafka.common.metadata.RangeRecord; import org.apache.kafka.common.metadata.RemoveRangeRecord; import org.apache.kafka.common.metadata.RemoveS3StreamObjectRecord; @@ -83,22 +84,17 @@ public void testRanges() { S3StreamMetadataImage image2 = new S3StreamMetadataImage( STREAM0, 1L, 0L, Map.of(0, new RangeMetadata(STREAM0, 1L, 0, 0L, 0L, BROKER0)), Map.of()); - S3StreamMetadataImage image = delta1.apply(); assertEquals(image2, delta1.apply()); testToImageAndBack(image2); - // 3. seal range 0_0, broker1 is the new leader, and create range0_1 + // 3. advance range 0_0, broker1 is the new leader, and create range0_1 List delta2Records = new ArrayList<>(); S3StreamMetadataDelta delta2 = new S3StreamMetadataDelta(image2); delta2Records.add(new ApiMessageAndVersion(new S3StreamRecord() .setStreamId(STREAM0) .setEpoch(2L) .setStartOffset(0L), (short) 0)); - delta2Records.add(new ApiMessageAndVersion(new RangeRecord() - .setStreamId(STREAM0) - .setRangeIndex(0) - .setEpoch(1L) - .setBrokerId(BROKER0) + delta2Records.add(new ApiMessageAndVersion(new AdvanceRangeRecord() .setStartOffset(0L) .setEndOffset(100L), (short) 0)); delta2Records.add(new ApiMessageAndVersion(new RangeRecord()