Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions checkstyle/import-control.xml
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,7 @@
<allow pkg="org.apache.kafka.server.util" />
<allow pkg="org.apache.kafka.snapshot" />
<allow pkg="org.apache.kafka.test" />
<allow pkg="org.apache.kafka.controller.stream" />
</subpackage>

<subpackage name="metadata">
Expand Down
2 changes: 1 addition & 1 deletion core/src/test/java/kafka/log/s3/S3StreamMemoryTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public ByteBuffer rawPayload() {
StreamManager streamManager;
ObjectManager objectManager;
S3StreamClient streamClient;
private final static long MAX_APPENDED_OFFSET = 1000;
private final static long MAX_APPENDED_OFFSET = 200;

Random random = new Random();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,8 @@ public class S3StreamConstant {

public static final long INIT_END_OFFSET = 0L;

public static final long INVALID_STREAM_ID = -1L;

public static final long INVALID_OBJECT_ID = -1L;

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.kafka.image;

import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
Expand All @@ -30,15 +29,17 @@
import org.apache.kafka.common.metadata.S3StreamRecord;
import org.apache.kafka.metadata.stream.RangeMetadata;
import org.apache.kafka.metadata.stream.S3StreamObject;
import org.apache.kafka.metadata.stream.StreamState;

public class S3StreamMetadataDelta {

private final S3StreamMetadataImage image;

private long streamId;
private long newStartOffset;
private long newEpoch;

private int activeRangeIndex = -1;
private StreamState currentState;
private int currentRangeIndex;

private final Map<Integer/*rangeIndex*/, RangeMetadata> changedRanges = new HashMap<>();
private final Set<Integer/*rangeIndex*/> removedRanges = new HashSet<>();
Expand All @@ -50,28 +51,28 @@ public S3StreamMetadataDelta(S3StreamMetadataImage image) {
this.newEpoch = image.getEpoch();
this.streamId = image.getStreamId();
this.newStartOffset = image.getStartOffset();
this.activeRangeIndex = image.getRanges().keySet().stream().sorted(Comparator.reverseOrder()).findFirst().orElse(-1);
this.currentState = image.state();
this.currentRangeIndex = image.rangeIndex();
}

public void replay(S3StreamRecord record) {
this.streamId = record.streamId();
this.newEpoch = record.epoch();
this.newStartOffset = record.startOffset();
this.currentState = StreamState.fromByte(record.streamState());
this.currentRangeIndex = record.rangeIndex();
}

public void replay(RangeRecord record) {
changedRanges.put(record.rangeIndex(), RangeMetadata.of(record));
// new add or update, so remove from removedRanges
removedRanges.remove(record.rangeIndex());
this.activeRangeIndex = record.rangeIndex();
}

public void replay(RemoveRangeRecord record) {
removedRanges.add(record.rangeIndex());
// new remove, so remove from changedRanges
changedRanges.remove(record.rangeIndex());
if (record.rangeIndex() == activeRangeIndex) {
activeRangeIndex = -1;
}
}

public void replay(S3StreamObjectRecord record) {
Expand All @@ -89,10 +90,10 @@ public void replay(RemoveS3StreamObjectRecord record) {
public void replay(AdvanceRangeRecord record) {
long startOffset = record.startOffset();
long newEndOffset = record.endOffset();
// check active range
RangeMetadata metadata = this.changedRanges.get(activeRangeIndex);
// check current range
RangeMetadata metadata = this.changedRanges.get(currentRangeIndex);
if (metadata == null) {
metadata = this.image.getRanges().get(activeRangeIndex);
metadata = this.image.getRanges().get(currentRangeIndex);
}
if (metadata == null) {
// ignore it
Expand All @@ -103,7 +104,7 @@ public void replay(AdvanceRangeRecord record) {
return;
}
// update the endOffset
this.changedRanges.put(activeRangeIndex, new RangeMetadata(
this.changedRanges.put(currentRangeIndex, new RangeMetadata(
streamId, metadata.epoch(), metadata.rangeIndex(), metadata.startOffset(), newEndOffset, metadata.brokerId()
));
}
Expand All @@ -119,7 +120,7 @@ public S3StreamMetadataImage apply() {
newS3StreamObjects.putAll(changedS3StreamObjects);
// remove all removed stream-objects
removedS3StreamObjectIds.forEach(newS3StreamObjects::remove);
return new S3StreamMetadataImage(streamId, newEpoch, newStartOffset, newRanges, newS3StreamObjects);
return new S3StreamMetadataImage(streamId, newEpoch, currentState, currentRangeIndex, newStartOffset, newRanges, newS3StreamObjects);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,34 +20,43 @@
import java.util.Map;
import java.util.Objects;
import org.apache.kafka.common.metadata.S3StreamRecord;
import org.apache.kafka.controller.stream.S3StreamConstant;
import org.apache.kafka.metadata.stream.RangeMetadata;
import org.apache.kafka.metadata.stream.S3StreamObject;
import org.apache.kafka.image.writer.ImageWriter;
import org.apache.kafka.image.writer.ImageWriterOptions;
import org.apache.kafka.metadata.stream.StreamState;

public class S3StreamMetadataImage {

public static final S3StreamMetadataImage EMPTY =
new S3StreamMetadataImage(-1L, -1L, -1L, Map.of(), Map.of());
new S3StreamMetadataImage(S3StreamConstant.INVALID_STREAM_ID, S3StreamConstant.INIT_EPOCH, StreamState.CLOSED,
S3StreamConstant.INIT_RANGE_INDEX, S3StreamConstant.INIT_START_OFFSET, Map.of(), Map.of());

private final long streamId;

private final long epoch;

private final int rangeIndex;

private final long startOffset;

private final StreamState state;

private final Map<Integer/*rangeIndex*/, RangeMetadata> ranges;

private final Map<Long/*objectId*/, S3StreamObject> streamObjects;

public S3StreamMetadataImage(
long streamId,
long epoch,
long streamId, long epoch, StreamState state,
int rangeIndex,
long startOffset,
Map<Integer, RangeMetadata> ranges,
Map<Long, S3StreamObject> streamObjects) {
this.streamId = streamId;
this.epoch = epoch;
this.state = state;
this.rangeIndex = rangeIndex;
this.startOffset = startOffset;
this.ranges = ranges;
this.streamObjects = streamObjects;
Expand All @@ -56,6 +65,8 @@ public S3StreamMetadataImage(
public void write(ImageWriter writer, ImageWriterOptions options) {
writer.write(0, new S3StreamRecord()
.setStreamId(streamId)
.setRangeIndex(rangeIndex)
.setStreamState(state.toByte())
.setEpoch(epoch)
.setStartOffset(startOffset));
ranges.values().forEach(rangeMetadata -> writer.write(rangeMetadata.toRecord()));
Expand Down Expand Up @@ -87,6 +98,14 @@ public long startOffset() {
return startOffset;
}

public int rangeIndex() {
return rangeIndex;
}

public StreamState state() {
return state;
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand All @@ -98,13 +117,15 @@ public boolean equals(Object o) {
S3StreamMetadataImage that = (S3StreamMetadataImage) o;
return this.streamId == that.streamId &&
this.epoch == that.epoch &&
this.state == that.state &&
this.rangeIndex == that.rangeIndex &&
this.startOffset == that.startOffset &&
this.ranges.equals(that.ranges) &&
this.streamObjects.equals(that.streamObjects);
}

@Override
public int hashCode() {
return Objects.hash(streamId, epoch, startOffset, ranges, streamObjects);
return Objects.hash(streamId, epoch, state, rangeIndex, startOffset, ranges, streamObjects);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,9 @@ public void replay(RemoveRangeRecord record) {

public void replay(S3StreamObjectRecord record) {
getOrCreateStreamMetadataDelta(record.streamId()).replay(record);
getOrCreateStreamMetadataDelta(record.streamId()).replay(new AdvanceRangeRecord()
.setStartOffset(record.startOffset())
.setEndOffset(record.endOffset()));
}

public void replay(RemoveS3StreamObjectRecord record) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,13 @@
import org.apache.kafka.common.metadata.RemoveS3StreamObjectRecord;
import org.apache.kafka.common.metadata.S3StreamObjectRecord;
import org.apache.kafka.common.metadata.S3StreamRecord;
import org.apache.kafka.controller.stream.S3StreamConstant;
import org.apache.kafka.image.writer.ImageWriterOptions;
import org.apache.kafka.image.writer.RecordListWriter;
import org.apache.kafka.metadata.RecordTestUtils;
import org.apache.kafka.metadata.stream.RangeMetadata;
import org.apache.kafka.metadata.stream.S3StreamObject;
import org.apache.kafka.metadata.stream.StreamState;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
Expand All @@ -56,12 +58,15 @@ public void testRanges() {
// 1. create stream0
delta0Records.add(new ApiMessageAndVersion(new S3StreamRecord()
.setStreamId(STREAM0)
.setRangeIndex(S3StreamConstant.INIT_RANGE_INDEX)
.setStreamState(StreamState.OPENED.toByte())
.setStartOffset(S3StreamConstant.INIT_START_OFFSET)
.setEpoch(0L)
.setStartOffset(0L), (short) 0));
RecordTestUtils.replayAll(delta0, delta0Records);
// verify delta and check image's write
S3StreamMetadataImage image1 = new S3StreamMetadataImage(
STREAM0, 0L, 0L, Map.of(), Map.of());
STREAM0, 0L, StreamState.OPENED, S3StreamConstant.INIT_RANGE_INDEX, S3StreamConstant.INIT_START_OFFSET, Map.of(), Map.of());
assertEquals(image1, delta0.apply());
testToImageAndBack(image1);

Expand All @@ -71,8 +76,10 @@ public void testRanges() {
S3StreamMetadataDelta delta1 = new S3StreamMetadataDelta(image1);
delta1Records.add(new ApiMessageAndVersion(new S3StreamRecord()
.setStreamId(STREAM0)
.setEpoch(1L)
.setStartOffset(0L), (short) 0));
.setRangeIndex(S3StreamConstant.INIT_RANGE_INDEX)
.setStreamState(StreamState.OPENED.toByte())
.setStartOffset(S3StreamConstant.INIT_START_OFFSET)
.setEpoch(1L), (short) 0));
delta1Records.add(new ApiMessageAndVersion(new RangeRecord()
.setStreamId(STREAM0)
.setRangeIndex(0)
Expand All @@ -82,7 +89,7 @@ public void testRanges() {
RecordTestUtils.replayAll(delta1, delta1Records);
// verify delta and check image's write
S3StreamMetadataImage image2 = new S3StreamMetadataImage(
STREAM0, 1L, 0L,
STREAM0, 1L, StreamState.OPENED, S3StreamConstant.INIT_RANGE_INDEX, S3StreamConstant.INIT_START_OFFSET,
Map.of(0, new RangeMetadata(STREAM0, 1L, 0, 0L, 0L, BROKER0)), Map.of());
assertEquals(image2, delta1.apply());
testToImageAndBack(image2);
Expand All @@ -92,8 +99,10 @@ public void testRanges() {
S3StreamMetadataDelta delta2 = new S3StreamMetadataDelta(image2);
delta2Records.add(new ApiMessageAndVersion(new S3StreamRecord()
.setStreamId(STREAM0)
.setEpoch(2L)
.setStartOffset(0L), (short) 0));
.setRangeIndex(0)
.setStreamState(StreamState.OPENED.toByte())
.setStartOffset(S3StreamConstant.INIT_START_OFFSET)
.setEpoch(2L), (short) 0));
delta2Records.add(new ApiMessageAndVersion(new AdvanceRangeRecord()
.setStartOffset(0L)
.setEndOffset(100L), (short) 0));
Expand All @@ -107,7 +116,7 @@ public void testRanges() {
RecordTestUtils.replayAll(delta2, delta2Records);
// verify delta and check image's write
S3StreamMetadataImage image3 = new S3StreamMetadataImage(
STREAM0, 2L, 0L, Map.of(
STREAM0, 2L, StreamState.OPENED, 0, 0L, Map.of(
0, new RangeMetadata(STREAM0, 1L, 0, 0, 100, BROKER0),
1, new RangeMetadata(STREAM0, 2L, 1, 100, 100, BROKER1)), Map.of());
assertEquals(image3, delta2.apply());
Expand All @@ -119,22 +128,24 @@ public void testRanges() {
delta3Records.add(new ApiMessageAndVersion(new S3StreamRecord()
.setStreamId(STREAM0)
.setEpoch(2L)
.setRangeIndex(0)
.setStreamState(StreamState.OPENED.toByte())
.setStartOffset(100L), (short) 0));
delta3Records.add(new ApiMessageAndVersion(new RemoveRangeRecord()
.setStreamId(STREAM0)
.setRangeIndex(0), (short) 0));
RecordTestUtils.replayAll(delta3, delta3Records);
// verify delta and check image's write
S3StreamMetadataImage image4 = new S3StreamMetadataImage(
STREAM0, 2L, 100L, Map.of(
STREAM0, 2L, StreamState.OPENED, 0, 100L, Map.of(
1, new RangeMetadata(STREAM0, 2L, 1, 100L, 100L, BROKER1)), Map.of());
assertEquals(image4, delta3.apply());
}

@Test
public void testStreamObjects() {
S3StreamMetadataImage image0 = new S3StreamMetadataImage(
STREAM0, 0L, 0L, Map.of(), Map.of());
STREAM0, 0L, StreamState.OPENED, -1, 0L, Map.of(), Map.of());
List<ApiMessageAndVersion> delta0Records = new ArrayList<>();
S3StreamMetadataDelta delta0 = new S3StreamMetadataDelta(image0);
// 1. create streamObject0 and streamObject1
Expand All @@ -151,7 +162,7 @@ public void testStreamObjects() {
RecordTestUtils.replayAll(delta0, delta0Records);
// verify delta and check image's write
S3StreamMetadataImage image1 = new S3StreamMetadataImage(
STREAM0, 0L, 0L, Map.of(), Map.of(
STREAM0, 0L, StreamState.OPENED, -1, 0L, Map.of(), Map.of(
0L, new S3StreamObject(0L, 999, STREAM0, 0L, 100L),
1L, new S3StreamObject(1L, 999, STREAM0, 100L, 200L)));
assertEquals(image1, delta0.apply());
Expand All @@ -165,7 +176,7 @@ public void testStreamObjects() {
RecordTestUtils.replayAll(delta1, delta1Records);
// verify delta and check image's write
S3StreamMetadataImage image2 = new S3StreamMetadataImage(
STREAM0, 0L, 0L, Map.of(), Map.of(
STREAM0, 0L, StreamState.OPENED, -1, 0L, Map.of(), Map.of(
1L, new S3StreamObject(1L, 999, STREAM0, 100L, 200L)));
assertEquals(image2, delta1.apply());
testToImageAndBack(image2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.kafka.metadata.stream.S3ObjectStreamIndex;
import org.apache.kafka.metadata.stream.S3StreamObject;
import org.apache.kafka.metadata.stream.S3WALObject;
import org.apache.kafka.metadata.stream.StreamState;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -120,7 +121,7 @@ public void testGetObjects() {
8L, new S3StreamObject(8, GB, STREAM0, 10L, 100L),
9L, new S3StreamObject(9, GB, STREAM0, 200L, 300L),
10L, new S3StreamObject(10, GB, STREAM0, 300L, 400L));
S3StreamMetadataImage streamImage = new S3StreamMetadataImage(STREAM0, 4L, 10, ranges, streamObjects);
S3StreamMetadataImage streamImage = new S3StreamMetadataImage(STREAM0, 4L, StreamState.OPENED, 4, 10, ranges, streamObjects);
S3StreamsMetadataImage streamsImage = new S3StreamsMetadataImage(STREAM0, Map.of(STREAM0, streamImage),
Map.of(BROKER0, broker0WALMetadataImage, BROKER1, broker1WALMetadataImage));

Expand Down