Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -286,26 +286,10 @@ public ControllerResult<CommitWALObjectResponseData> commitWALObject(CommitWALOb
return ControllerResult.of(Collections.emptyList(), resp);
}
records.addAll(commitResult.records());
List<S3ObjectStreamIndex> indexes = new ArrayList<>(streamRanges.size());
streamRanges.stream().filter(range -> !failedStreamIds.contains(range.streamId())).forEach(range -> {
// build WAL object
long streamId = range.streamId();
long startOffset = range.startOffset();
long endOffset = range.endOffset();
indexes.add(new S3ObjectStreamIndex(objectId, startOffset, endOffset));
// TODO: support lazy flush range's end offset
// update range's offset
S3StreamMetadata streamMetadata = this.streamsMetadata.get(streamId);
RangeMetadata oldRange = streamMetadata.ranges.get(streamMetadata.currentRangeIndex());
RangeRecord record = new RangeRecord()
.setStreamId(streamId)
.setBrokerId(brokerId)
.setEpoch(oldRange.epoch())
.setRangeIndex(oldRange.rangeIndex())
.setStartOffset(oldRange.startOffset())
.setEndOffset(endOffset);
records.add(new ApiMessageAndVersion(record, (short) 0));
});
List<S3ObjectStreamIndex> indexes = streamRanges.stream()
.filter(range -> !failedStreamIds.contains(range.streamId()))
.map(range -> new S3ObjectStreamIndex(range.streamId(), range.startOffset(), range.endOffset()))
.collect(Collectors.toList());
// update broker's wal object
BrokerS3WALMetadata brokerMetadata = this.brokersMetadata.get(brokerId);
if (brokerMetadata == null) {
Expand Down Expand Up @@ -394,11 +378,33 @@ public void replay(WALObjectRecord record) {
log.error("broker {} not exist when replay wal object record {}", brokerId, record);
return;
}

// create wal object
Map<Long, List<S3ObjectStreamIndex>> indexMap = streamIndexes
.stream()
.map(S3ObjectStreamIndex::of)
.collect(Collectors.groupingBy(S3ObjectStreamIndex::getStreamId));
brokerMetadata.walObjects.add(new S3WALObject(objectId, brokerId, indexMap));

// update range
record.streamsIndex().forEach(index -> {
long streamId = index.streamId();
S3StreamMetadata metadata = this.streamsMetadata.get(streamId);
if (metadata == null) {
// ignore it
return;
}
RangeMetadata rangeMetadata = metadata.ranges().get(metadata.currentRangeIndex.get());
if (rangeMetadata == null) {
// ignore it
return;
}
if (rangeMetadata.endOffset() != index.startOffset()) {
// ignore it
return;
}
rangeMetadata.setEndOffset(index.endOffset());
});
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@

package org.apache.kafka.image;

import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.kafka.common.metadata.AdvanceRangeRecord;
import org.apache.kafka.common.metadata.RangeRecord;
import org.apache.kafka.common.metadata.RemoveRangeRecord;
import org.apache.kafka.common.metadata.RemoveS3StreamObjectRecord;
Expand All @@ -36,6 +38,8 @@ public class S3StreamMetadataDelta {
private long newStartOffset;
private long newEpoch;

private int activeRangeIndex = -1;

private final Map<Integer/*rangeIndex*/, RangeMetadata> changedRanges = new HashMap<>();
private final Set<Integer/*rangeIndex*/> removedRanges = new HashSet<>();
private final Map<Long/*objectId*/, S3StreamObject> changedS3StreamObjects = new HashMap<>();
Expand All @@ -46,6 +50,7 @@ public S3StreamMetadataDelta(S3StreamMetadataImage image) {
this.newEpoch = image.getEpoch();
this.streamId = image.getStreamId();
this.newStartOffset = image.getStartOffset();
this.activeRangeIndex = image.getRanges().keySet().stream().sorted(Comparator.reverseOrder()).findFirst().orElse(-1);
}
public void replay(S3StreamRecord record) {
this.streamId = record.streamId();
Expand All @@ -57,12 +62,16 @@ public void replay(RangeRecord record) {
changedRanges.put(record.rangeIndex(), RangeMetadata.of(record));
// new add or update, so remove from removedRanges
removedRanges.remove(record.rangeIndex());
this.activeRangeIndex = record.rangeIndex();
}

public void replay(RemoveRangeRecord record) {
removedRanges.add(record.rangeIndex());
// new remove, so remove from changedRanges
changedRanges.remove(record.rangeIndex());
if (record.rangeIndex() == activeRangeIndex) {
activeRangeIndex = -1;
}
}

public void replay(S3StreamObjectRecord record) {
Expand All @@ -77,6 +86,28 @@ public void replay(RemoveS3StreamObjectRecord record) {
changedS3StreamObjects.remove(record.objectId());
}

public void replay(AdvanceRangeRecord record) {
long startOffset = record.startOffset();
long newEndOffset = record.endOffset();
// check active range
RangeMetadata metadata = this.changedRanges.get(activeRangeIndex);
if (metadata == null) {
metadata = this.image.getRanges().get(activeRangeIndex);
}
if (metadata == null) {
// ignore it
return;
}
if (startOffset != metadata.endOffset()) {
// ignore it
return;
}
// update the endOffset
this.changedRanges.put(activeRangeIndex, new RangeMetadata(
streamId, metadata.epoch(), metadata.rangeIndex(), metadata.startOffset(), newEndOffset, metadata.brokerId()
));
}

public S3StreamMetadataImage apply() {
Map<Integer, RangeMetadata> newRanges = new HashMap<>(image.getRanges());
// add all new changed ranges
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.kafka.common.metadata.AdvanceRangeRecord;
import org.apache.kafka.common.metadata.AssignedStreamIdRecord;
import org.apache.kafka.common.metadata.BrokerWALMetadataRecord;
import org.apache.kafka.common.metadata.RangeRecord;
Expand Down Expand Up @@ -103,6 +104,11 @@ public void replay(RemoveS3StreamObjectRecord record) {

public void replay(WALObjectRecord record) {
getOrCreateBrokerStreamMetadataDelta(record.brokerId()).replay(record);
record.streamsIndex().forEach(index -> {
getOrCreateStreamMetadataDelta(index.streamId()).replay(new AdvanceRangeRecord()
.setStartOffset(index.startOffset())
.setEndOffset(index.endOffset()));
});
}

public void replay(RemoveWALObjectRecord record) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@ public int brokerId() {
return brokerId;
}

public void setEndOffset(long endOffset) {
this.endOffset = endOffset;
}

public ApiMessageAndVersion toRecord() {
return new ApiMessageAndVersion(new RangeRecord()
.setStreamId(streamId)
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

{
"apiKey": 515,
"type": "metadata",
"name": "AdvanceRangeRecord",
"validVersions": "0",
"flexibleVersions": "0+",
"fields": [
{
"name": "StartOffset",
"type": "int64",
"versions": "0+",
"about": "The start offset of the range"
},
{
"name": "EndOffset",
"type": "int64",
"versions": "0+",
"about": "The end offset of the range"
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.kafka.common.metadata.AdvanceRangeRecord;
import org.apache.kafka.common.metadata.RangeRecord;
import org.apache.kafka.common.metadata.RemoveRangeRecord;
import org.apache.kafka.common.metadata.RemoveS3StreamObjectRecord;
Expand Down Expand Up @@ -83,22 +84,17 @@ public void testRanges() {
S3StreamMetadataImage image2 = new S3StreamMetadataImage(
STREAM0, 1L, 0L,
Map.of(0, new RangeMetadata(STREAM0, 1L, 0, 0L, 0L, BROKER0)), Map.of());
S3StreamMetadataImage image = delta1.apply();
assertEquals(image2, delta1.apply());
testToImageAndBack(image2);

// 3. seal range 0_0, broker1 is the new leader, and create range0_1
// 3. advance range 0_0, broker1 is the new leader, and create range0_1
List<ApiMessageAndVersion> delta2Records = new ArrayList<>();
S3StreamMetadataDelta delta2 = new S3StreamMetadataDelta(image2);
delta2Records.add(new ApiMessageAndVersion(new S3StreamRecord()
.setStreamId(STREAM0)
.setEpoch(2L)
.setStartOffset(0L), (short) 0));
delta2Records.add(new ApiMessageAndVersion(new RangeRecord()
.setStreamId(STREAM0)
.setRangeIndex(0)
.setEpoch(1L)
.setBrokerId(BROKER0)
delta2Records.add(new ApiMessageAndVersion(new AdvanceRangeRecord()
.setStartOffset(0L)
.setEndOffset(100L), (short) 0));
delta2Records.add(new ApiMessageAndVersion(new RangeRecord()
Expand Down