Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -1268,6 +1268,7 @@ project(':metadata') {
implementation libs.jacksonJDK8Datatypes
implementation libs.metrics
implementation libs.nettyCommon
implementation libs.zstd
compileOnly libs.log4j
testImplementation libs.junitJupiter
testImplementation libs.jqwik
Expand Down
1 change: 1 addition & 0 deletions checkstyle/import-control.xml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
<allow pkg="io.netty" />
<allow pkg="software.amazon.awssdk" />
<allow pkg="com.automq.stream" />
<allow pkg="com.github.luben.zstd" />

<!-- no one depends on the server -->
<disallow pkg="kafka" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ public CompletableFuture<List<S3ObjectMetadata>> getStreamSetObjects() {
.map(object -> {
S3Object s3Object = this.objectsImage.getObjectMetadata(object.objectId());
return new S3ObjectMetadata(object.objectId(), object.objectType(),
new ArrayList<>(object.offsetRanges().values()), object.dataTimeInMs(),
object.offsetRangeList(), object.dataTimeInMs(),
s3Object.getCommittedTimeInMs(), s3Object.getObjectSize(),
object.orderId());
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,28 +17,28 @@

package kafka.log.stream.s3;

import kafka.log.stream.s3.metadata.StreamMetadataManager;
import com.automq.stream.s3.metadata.S3StreamConstant;
import com.automq.stream.s3.metadata.StreamOffsetRange;
import com.automq.stream.s3.metadata.StreamState;
import kafka.log.stream.s3.metadata.MetadataListener;
import kafka.log.stream.s3.metadata.StreamMetadataManager;
import kafka.server.BrokerServer;
import kafka.server.KafkaConfig;
import kafka.server.metadata.BrokerMetadataListener;
import kafka.server.metadata.KRaftMetadataCache;
import org.apache.kafka.image.DeltaMap;
import org.apache.kafka.image.NodeS3StreamSetObjectMetadataImage;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.image.MetadataProvenance;
import org.apache.kafka.image.NodeS3StreamSetObjectMetadataImage;
import org.apache.kafka.image.S3ObjectsImage;
import org.apache.kafka.image.S3StreamMetadataImage;
import org.apache.kafka.image.S3StreamsMetadataImage;
import org.apache.kafka.metadata.stream.InRangeObjects;
import org.apache.kafka.metadata.stream.RangeMetadata;
import org.apache.kafka.metadata.stream.S3Object;
import org.apache.kafka.metadata.stream.S3ObjectState;
import com.automq.stream.s3.metadata.S3StreamConstant;
import org.apache.kafka.metadata.stream.S3StreamObject;
import org.apache.kafka.metadata.stream.S3StreamSetObject;
import com.automq.stream.s3.metadata.StreamOffsetRange;
import com.automq.stream.s3.metadata.StreamState;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
Expand All @@ -47,6 +47,7 @@
import org.mockito.Mockito;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -94,7 +95,7 @@ public void setUp() {
private static MetadataImage image2;

static {
DeltaMap<Long, S3Object> map = new DeltaMap<>(new int[] {});
DeltaMap<Long, S3Object> map = new DeltaMap<>(new int[]{});
map.putAll(Map.of(
0L, new S3Object(0L, 128, null, -1, -1, -1, -1, S3ObjectState.COMMITTED),
1L, new S3Object(1L, 128, null, -1, -1, -1, -1, S3ObjectState.COMMITTED),
Expand All @@ -110,10 +111,10 @@ public void setUp() {
S3StreamMetadataImage streamImage = new S3StreamMetadataImage(STREAM0, 1L, StreamState.OPENED, 0, 10L, ranges, streamObjects);

NodeS3StreamSetObjectMetadataImage walMetadataImage0 = new NodeS3StreamSetObjectMetadataImage(BROKER0, S3StreamConstant.INVALID_BROKER_EPOCH, Map.of(
1L, new S3StreamSetObject(1L, BROKER0, Map.of(
STREAM1, new StreamOffsetRange(STREAM1, 0L, 100L)), 1L),
2L, new S3StreamSetObject(2L, BROKER0, Map.of(
STREAM2, new StreamOffsetRange(STREAM2, 0L, 100L)), 2L)));
1L, new S3StreamSetObject(1L, BROKER0, List.of(
new StreamOffsetRange(STREAM1, 0L, 100L)), 1L),
2L, new S3StreamSetObject(2L, BROKER0, List.of(
new StreamOffsetRange(STREAM2, 0L, 100L)), 2L)));

S3StreamsMetadataImage streamsImage = new S3StreamsMetadataImage(STREAM0, Map.of(STREAM0, streamImage),
Map.of(BROKER0, walMetadataImage0));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -445,41 +444,7 @@ public ControllerResult<TrimStreamResponse> trimStream(int nodeId, long nodeEpoc
if (resp.errorCode() != Errors.NONE.code()) {
return ControllerResult.of(Collections.emptyList(), resp);
}
// remove stream set object or remove stream range in stream set object
// TODO: optimize
this.nodesMetadata.values()
.stream()
.flatMap(entry -> entry.streamSetObjects().values().stream())
.filter(streamSetObject -> streamSetObject.offsetRanges().containsKey(streamId))
.filter(streamSetObject -> streamSetObject.offsetRanges().get(streamId).getEndOffset() <= newStartOffset)
.forEach(streamSetObj -> {
if (streamSetObj.offsetRanges().size() == 1) {
// only this range, but we will remove this range, so now we can remove this stream set object
records.add(new ApiMessageAndVersion(
new RemoveStreamSetObjectRecord()
.setNodeId(streamSetObj.nodeId())
.setObjectId(streamSetObj.objectId()), (short) 0
));
ControllerResult<Boolean> markDestroyResult = this.s3ObjectControlManager.markDestroyObjects(
List.of(streamSetObj.objectId()));
if (!markDestroyResult.response()) {
log.error("[TrimStream] Mark destroy stream set object: {} failed", streamSetObj.objectId());
resp.setErrorCode(Errors.STREAM_INNER_ERROR.code());
return;
}
records.addAll(markDestroyResult.records());
return;
}
Map<Long, StreamOffsetRange> newOffsetRange = new HashMap<>(streamSetObj.offsetRanges());
// remove offset range
newOffsetRange.remove(streamId);
records.add(new ApiMessageAndVersion(new S3StreamSetObjectRecord()
.setObjectId(streamSetObj.objectId())
.setNodeId(streamSetObj.nodeId())
.setStreamsIndex(newOffsetRange.values().stream().map(Convertor::to).collect(Collectors.toList()))
.setDataTimeInMs(streamSetObj.dataTimeInMs())
.setOrderId(streamSetObj.orderId()), (short) 0));
});
// the data in stream set object will be removed by compaction
if (resp.errorCode() != Errors.NONE.code()) {
return ControllerResult.of(Collections.emptyList(), resp);
}
Expand Down Expand Up @@ -523,39 +488,7 @@ public ControllerResult<DeleteStreamResponse> deleteStream(int nodeId, long node
return ControllerResult.of(Collections.emptyList(), resp);
}
records.addAll(markDestroyResult.records());
// remove stream set object or remove stream-offset-range in stream set object
this.nodesMetadata.values()
.stream()
.flatMap(entry -> entry.streamSetObjects().values().stream())
.filter(streamsSetObject -> streamsSetObject.offsetRanges().containsKey(streamId))
.forEach(streamSetObj -> {
if (streamSetObj.offsetRanges().size() == 1) {
// only this range, but we will remove this range, so now we can remove this stream set object
records.add(new ApiMessageAndVersion(
new RemoveStreamSetObjectRecord()
.setNodeId(streamSetObj.nodeId())
.setObjectId(streamSetObj.objectId()), (short) 0
));
ControllerResult<Boolean> result = this.s3ObjectControlManager.markDestroyObjects(
List.of(streamSetObj.objectId()));
if (!result.response()) {
log.error("[DeleteStream]: Mark destroy stream set object: {} failed", streamSetObj.objectId());
resp.setErrorCode(Errors.STREAM_INNER_ERROR.code());
return;
}
records.addAll(result.records());
return;
}
Map<Long, StreamOffsetRange> newOffsetRange = new HashMap<>(streamSetObj.offsetRanges());
// remove offset range
newOffsetRange.remove(streamId);
records.add(new ApiMessageAndVersion(new S3StreamSetObjectRecord()
.setObjectId(streamSetObj.objectId())
.setNodeId(streamSetObj.nodeId())
.setStreamsIndex(newOffsetRange.values().stream().map(Convertor::to).collect(Collectors.toList()))
.setDataTimeInMs(streamSetObj.dataTimeInMs())
.setOrderId(streamSetObj.orderId()), (short) 0));
});
// the data in stream set object will be removed by compaction
if (resp.errorCode() != Errors.NONE.code()) {
return ControllerResult.of(Collections.emptyList(), resp);
}
Expand Down Expand Up @@ -1043,10 +976,8 @@ public void replay(S3StreamSetObjectRecord record) {
}

// create stream set object
Map<Long, StreamOffsetRange> indexMap = streamIndexes
.stream()
.collect(Collectors.toMap(StreamIndex::streamId, Convertor::to));
nodeMetadata.streamSetObjects().put(objectId, new S3StreamSetObject(objectId, nodeId, indexMap, orderId, dataTs));
List<StreamOffsetRange> ranges = streamIndexes.stream().map(Convertor::to).collect(Collectors.toList());
nodeMetadata.streamSetObjects().put(objectId, new S3StreamSetObject(objectId, nodeId, ranges, orderId, dataTs));

// update range
record.streamsIndex().forEach(index -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

import com.automq.stream.s3.metadata.S3ObjectMetadata;
import com.automq.stream.s3.metadata.StreamOffsetRange;
import com.automq.stream.utils.biniarysearch.AbstractOrderedCollection;
import com.automq.stream.utils.biniarysearch.ComparableItem;
import org.apache.kafka.common.metadata.AssignedStreamIdRecord;
import org.apache.kafka.image.writer.ImageWriter;
import org.apache.kafka.image.writer.ImageWriterOptions;
Expand Down Expand Up @@ -176,25 +178,29 @@ public RangeSearcher(long startOffset, long endOffset, long streamId, int nodeId

private Queue<S3ObjectMetadataWrapper> rangeOfStreamSetObjects() {
NodeS3StreamSetObjectMetadataImage streamSetObjectImage = nodeStreamSetObjectMetadata.get(nodeId);
return streamSetObjectImage.orderList().stream()
.filter(obj -> obj.offsetRanges().containsKey(streamId))
.filter(obj -> {
StreamOffsetRange offsetRange = obj.offsetRanges().get(streamId);
long objectStartOffset = offsetRange.getStartOffset();
long objectEndOffset = offsetRange.getEndOffset();
return objectStartOffset < endOffset && objectEndOffset > startOffset;
}).map(obj -> {
StreamOffsetRange offsetRange = obj.offsetRanges().get(streamId);
long startOffset = offsetRange.getStartOffset();
long endOffset = offsetRange.getEndOffset();
List<StreamOffsetRange> offsetRanges = obj.offsetRanges().values().stream().sorted()
.collect(Collectors.toList());
S3ObjectMetadata s3ObjectMetadata = new S3ObjectMetadata(
obj.objectId(), obj.objectType(), offsetRanges, obj.dataTimeInMs(),
List<S3StreamSetObject> streamSetObjects = streamSetObjectImage.orderList();
Queue<S3ObjectMetadataWrapper> s3ObjectMetadataList = new LinkedList<>();
for (S3StreamSetObject obj : streamSetObjects) {
// TODO: cache the stream offset ranges to accelerate the search
// TODO: cache the last search index, to accelerate the search
List<StreamOffsetRange> ranges = obj.offsetRangeList();
int index = new StreamOffsetRanges(ranges).search(streamId);
if (index < 0) {
continue;
}
StreamOffsetRange range = ranges.get(index);
if (range.getStartOffset() >= endOffset || range.getEndOffset() < startOffset) {
continue;
}
S3ObjectMetadata s3ObjectMetadata = new S3ObjectMetadata(
obj.objectId(), obj.objectType(), ranges, obj.dataTimeInMs(),
obj.orderId());
return new S3ObjectMetadataWrapper(s3ObjectMetadata, startOffset, endOffset);
})
.collect(Collectors.toCollection(LinkedList::new));
s3ObjectMetadataList.add(new S3ObjectMetadataWrapper(s3ObjectMetadata, range.getStartOffset(), range.getEndOffset()));
if (range.getEndOffset() >= endOffset) {
break;
}
}
return s3ObjectMetadataList;
}

private Queue<S3ObjectMetadataWrapper> rangeOfStreamObjects() {
Expand Down Expand Up @@ -340,4 +346,34 @@ public String toString() {
map(e -> e.getKey() + ":" + e.getValue()).collect(Collectors.joining(", ")) +
'}';
}

static class StreamOffsetRanges extends AbstractOrderedCollection<Long> {
private final List<StreamOffsetRange> ranges;

public StreamOffsetRanges(List<StreamOffsetRange> ranges) {
this.ranges = ranges;
}

@Override
protected int size() {
return ranges.size();
}

@Override
protected ComparableItem<Long> get(int index) {
StreamOffsetRange range = ranges.get(index);
return new ComparableItem<>() {
@Override
public boolean isLessThan(Long o) {
return range.getStreamId() < o;
}

@Override
public boolean isGreaterThan(Long o) {
return range.getStreamId() > o;
}
};
}

}
}
Loading