Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions checkstyle/import-control.xml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
<allow pkg="software.amazon.awssdk" />
<allow pkg="com.automq.stream" />
<allow pkg="com.github.luben.zstd" />
<allow pkg="com.google.common.cache" />

<!-- no one depends on the server -->
<disallow pkg="kafka" />
Expand Down
4 changes: 2 additions & 2 deletions checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -306,9 +306,9 @@
<suppress checks="(ParameterNumber|ClassDataAbstractionCoupling)"
files="(QuorumController).java"/>
<suppress checks="CyclomaticComplexity"
files="(ClientQuotasImage|MetadataDelta|QuorumController|ReplicationControlManager).java"/>
files="(ClientQuotasImage|MetadataDelta|QuorumController|ReplicationControlManager|S3StreamsMetadataImage).java"/>
<suppress checks="NPathComplexity"
files="(ClientQuotasImage|KafkaEventQueue|ReplicationControlManager|FeatureControlManager).java"/>
files="(ClientQuotasImage|KafkaEventQueue|ReplicationControlManager|FeatureControlManager|S3StreamsMetadataImage).java"/>
<suppress checks="(NPathComplexity|ClassFanOutComplexity|CyclomaticComplexity|ClassDataAbstractionCoupling|LocalVariableName|MemberName|ParameterName|MethodLength|JavaNCSS|AvoidStarImport)"
files="metadata[\\/]src[\\/](generated|generated-test)[\\/].+.java$"/>
<suppress checks="NPathComplexity"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package kafka.log.stream.s3.metadata;

import com.automq.stream.s3.metadata.S3ObjectMetadata;
import com.automq.stream.s3.metadata.S3StreamConstant;
import com.automq.stream.s3.metadata.StreamMetadata;
import com.automq.stream.utils.FutureUtil;
import io.netty.util.concurrent.DefaultThreadFactory;
Expand All @@ -29,37 +31,28 @@
import org.apache.kafka.image.S3StreamsMetadataImage;
import org.apache.kafka.metadata.stream.InRangeObjects;
import org.apache.kafka.metadata.stream.S3Object;
import com.automq.stream.s3.metadata.S3ObjectMetadata;
import com.automq.stream.s3.metadata.S3StreamConstant;
import org.apache.kafka.metadata.stream.S3StreamObject;
import com.automq.stream.s3.metadata.StreamOffsetRange;
import org.apache.kafka.raft.OffsetAndEpoch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;

import static com.automq.stream.s3.metadata.ObjectUtils.NOOP_OFFSET;
import static com.automq.stream.utils.FutureUtil.exec;

public class StreamMetadataManager implements InRangeObjectsFetcher {

// TODO: optimize by more suitable concurrent protection
private final static Logger LOGGER = LoggerFactory.getLogger(StreamMetadataManager.class);
private final KafkaConfig config;
private final BrokerServer broker;
private final Map<Long/*stream id*/, Map<Long/*end offset*/, List<GetObjectsTask>>> pendingGetObjectsTasks;
private final List<GetObjectsTask> pendingGetObjectsTasks;
private final ExecutorService pendingExecutorService;
// TODO: we just need the version of streams metadata, not the whole image
private volatile OffsetAndEpoch version;
Expand All @@ -74,8 +67,7 @@ public StreamMetadataManager(BrokerServer broker, KafkaConfig config) {
this.objectsImage = currentImage.objectsMetadata();
this.version = currentImage.highestOffsetAndEpoch();
this.broker.metadataListener().registerMetadataListener(this::onImageChanged);
// TODO: optimize by more suitable data structure for pending tasks
this.pendingGetObjectsTasks = new HashMap<>();
this.pendingGetObjectsTasks = new LinkedList<>();
this.pendingExecutorService = Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("pending-get-objects-task-executor"));
}

Expand All @@ -89,15 +81,9 @@ private void onImageChanged(MetadataDelta delta, MetadataImage newImage) {
// update image
this.streamsImage = newImage.streamsMetadata();
this.objectsImage = newImage.objectsMetadata();
// remove all catch up pending tasks
List<GetObjectsTask> retryTasks = removePendingTasks();

// retry all pending tasks
if (retryTasks.isEmpty()) {
return;
}
this.pendingExecutorService.submit(() -> {
retryPendingTasks(retryTasks);
});
retryPendingTasks();
}
}

Expand All @@ -116,77 +102,42 @@ public CompletableFuture<List<S3ObjectMetadata>> getStreamSetObjects() {
}
}

// must access thread safe
private List<GetObjectsTask> removePendingTasks() {
if (this.pendingGetObjectsTasks == null || this.pendingGetObjectsTasks.isEmpty()) {
return Collections.emptyList();
}
Set<Long> pendingStreams = pendingGetObjectsTasks.keySet();
List<StreamOffsetRange> pendingStreamsOffsetRange = pendingStreams
.stream()
.map(streamsImage::offsetRange)
.filter(offset -> offset != StreamOffsetRange.INVALID)
.collect(Collectors.toList());
if (pendingStreamsOffsetRange.isEmpty()) {
return Collections.emptyList();
}
List<GetObjectsTask> retryTasks = new ArrayList<>();
pendingStreamsOffsetRange.forEach(offsetRange -> {
long streamId = offsetRange.streamId();
long endOffset = offsetRange.endOffset();
Map<Long, List<GetObjectsTask>> tasks = StreamMetadataManager.this.pendingGetObjectsTasks.get(streamId);
if (tasks == null || tasks.isEmpty()) {
return;
}
Iterator<Entry<Long, List<GetObjectsTask>>> iterator =
tasks.entrySet().iterator();
while (iterator.hasNext()) {
Entry<Long, List<GetObjectsTask>> entry = iterator.next();
long pendingEndOffset = entry.getKey();
if (pendingEndOffset > endOffset) {
break;
@Override
public synchronized CompletableFuture<InRangeObjects> fetch(long streamId, long startOffset, long endOffset, int limit) {
// TODO: cache the object list for next search
return exec(() -> fetch0(streamId, startOffset, endOffset, limit), LOGGER, "fetchObjects").thenApply(rst -> {
rst.objects().forEach(object -> {
S3Object objectMetadata = objectsImage.getObjectMetadata(object.objectId());
if (objectMetadata == null) {
// should not happen
LOGGER.error(
"[FetchObjects]: stream: {}, startOffset: {}, endOffset: {}, limit: {}, and search in metadataCache failed with empty result",
streamId, startOffset, endOffset, limit);
throw new IllegalStateException("cannt find object metadata for object: " + object.objectId());
}
iterator.remove();
List<GetObjectsTask> getObjectsTasks = entry.getValue();
retryTasks.addAll(getObjectsTasks);
}
if (tasks.isEmpty()) {
StreamMetadataManager.this.pendingGetObjectsTasks.remove(streamId);
object.setObjectSize(objectMetadata.getObjectSize());
object.setCommittedTimestamp(objectMetadata.getCommittedTimeInMs());

});
if (LOGGER.isTraceEnabled()) {
LOGGER.trace(
"[FetchObjects]: stream: {}, startOffset: {}, endOffset: {}, limit: {}, and search in metadataCache success with result: {}",
streamId, startOffset, endOffset, limit, rst);
}
return rst;
});
return retryTasks;
}

@Override
public synchronized CompletableFuture<InRangeObjects> fetch(long streamId, long startOffset, long endOffset, int limit) {
S3StreamMetadataImage streamImage = streamsImage.streamsMetadata().get(streamId);
if (streamImage == null) {
LOGGER.warn(
"[FetchObjects]: stream: {}, startOffset: {}, endOffset: {}, limit: {}, and streamImage is null",
streamId, startOffset, endOffset, limit);
return CompletableFuture.completedFuture(InRangeObjects.INVALID);
}
StreamOffsetRange offsetRange = streamImage.offsetRange();
if (offsetRange == null || offsetRange == StreamOffsetRange.INVALID) {
return CompletableFuture.completedFuture(InRangeObjects.INVALID);
}
long streamStartOffset = offsetRange.startOffset();
long streamEndOffset = offsetRange.endOffset();
if (startOffset < streamStartOffset) {
LOGGER.warn(
"[FetchObjects]: stream: {}, startOffset: {}, endOffset: {}, limit: {}, and startOffset < streamStartOffset: {}",
streamId, startOffset, endOffset, limit, streamStartOffset);
return CompletableFuture.completedFuture(InRangeObjects.INVALID);
}
endOffset = endOffset == NOOP_OFFSET ? streamEndOffset : endOffset;
if (endOffset > streamEndOffset) {
// lag behind, need to wait for cache catch up
LOGGER.warn("[FetchObjects]: pending request, stream: {}, startOffset: {}, endOffset: {}, streamEndOffset: {}, limit: {}",
streamId, startOffset, endOffset, streamEndOffset, limit);
return pendingFetch(streamId, startOffset, endOffset, limit);
}
long finalEndOffset = endOffset;
return FutureUtil.exec(() -> fetch0(streamId, startOffset, finalEndOffset, limit), LOGGER, "fetch");
private synchronized CompletableFuture<InRangeObjects> fetch0(long streamId, long startOffset, long endOffset, int limit) {
InRangeObjects rst = streamsImage.getObjects(streamId, startOffset, endOffset, limit);
if (rst.objects().size() >= limit || rst.endOffset() >= endOffset || rst == InRangeObjects.INVALID) {
return CompletableFuture.completedFuture(rst);
}
LOGGER.info("[FetchObjects],[PENDING],streamId={} startOffset={} endOffset={} limit={}", streamId, startOffset, endOffset, limit);
CompletableFuture<Void> pendingCf = pendingFetch();
CompletableFuture<InRangeObjects> rstCf = new CompletableFuture<>();
FutureUtil.propagate(pendingCf.thenCompose(nil -> fetch0(streamId, startOffset, endOffset, limit)), rstCf);
return rstCf.whenComplete((r, ex) -> LOGGER.info("[FetchObjects],[COMPLETE_PENDING],streamId={} startOffset={} endOffset={} limit={}", streamId, startOffset, endOffset, limit));
}

public CompletableFuture<List<S3ObjectMetadata>> getStreamObjects(long streamId, long startOffset, long endOffset, int limit) {
Expand Down Expand Up @@ -220,85 +171,44 @@ public List<StreamMetadata> getStreamMetadataList(List<Long> streamIds) {
continue;
}
StreamMetadata streamMetadata = new StreamMetadata(streamId, streamImage.getEpoch(),
streamImage.getStartOffset(), streamImage.getEndOffset(), streamImage.state());
streamImage.getStartOffset(), -1L, streamImage.state()) {
@Override
public long endOffset() {
throw new UnsupportedOperationException();
}
};
streamMetadataList.add(streamMetadata);
}
return streamMetadataList;
}
}

// must access thread safe
private CompletableFuture<InRangeObjects> pendingFetch(long streamId, long startOffset, long endOffset, int limit) {
GetObjectsTask task = GetObjectsTask.of(streamId, startOffset, endOffset, limit);
Map<Long, List<GetObjectsTask>> tasks = StreamMetadataManager.this.pendingGetObjectsTasks.computeIfAbsent(task.streamId,
k -> new TreeMap<>());
List<GetObjectsTask> getObjectsTasks = tasks.computeIfAbsent(task.endOffset, k -> new ArrayList<>());
getObjectsTasks.add(task);
private CompletableFuture<Void> pendingFetch() {
GetObjectsTask task = new GetObjectsTask();
synchronized (pendingGetObjectsTasks) {
pendingGetObjectsTasks.add(task);
}
return task.cf;
}

// must access thread safe
private synchronized CompletableFuture<InRangeObjects> fetch0(long streamId, long startOffset, long endOffset, int limit) {
InRangeObjects cachedInRangeObjects = streamsImage.getObjects(streamId, startOffset, endOffset, limit);
if (cachedInRangeObjects == null || cachedInRangeObjects == InRangeObjects.INVALID) {
LOGGER.warn(
"[FetchObjects]: stream: {}, startOffset: {}, endOffset: {}, limit: {}, and search in metadataCache failed with empty result",
streamId, startOffset, endOffset, limit);
return CompletableFuture.completedFuture(InRangeObjects.INVALID);
}
// fill the objects' size and committed-timestamp
for (S3ObjectMetadata object : cachedInRangeObjects.objects()) {
S3Object objectMetadata = objectsImage.getObjectMetadata(object.objectId());
if (objectMetadata == null) {
// should not happen
LOGGER.error(
"[FetchObjects]: stream: {}, startOffset: {}, endOffset: {}, limit: {}, and search in metadataCache failed with empty result",
streamId, startOffset, endOffset, limit);
return CompletableFuture.completedFuture(InRangeObjects.INVALID);
void retryPendingTasks() {
synchronized (pendingGetObjectsTasks) {
if (pendingGetObjectsTasks.isEmpty()) {
return;
}
object.setObjectSize(objectMetadata.getObjectSize());
object.setCommittedTimestamp(objectMetadata.getCommittedTimeInMs());
LOGGER.info("[RetryPendingTasks]: retry tasks count: {}", pendingGetObjectsTasks.size());
pendingGetObjectsTasks.forEach(t -> t.cf.completeAsync(() -> null, pendingExecutorService));
pendingGetObjectsTasks.clear();
}
LOGGER.trace(
"[FetchObjects]: stream: {}, startOffset: {}, endOffset: {}, limit: {}, and search in metadataCache success with result: {}",
streamId, startOffset, endOffset, limit, cachedInRangeObjects);
return CompletableFuture.completedFuture(cachedInRangeObjects);
}

void retryPendingTasks(List<GetObjectsTask> tasks) {
if (tasks == null || tasks.isEmpty()) {
return;
}
LOGGER.info("[RetryPendingTasks]: retry tasks count: {}", tasks.size());
tasks.forEach(task -> {
long streamId = task.streamId;
long startOffset = task.startOffset;
long endOffset = task.endOffset;
int limit = task.limit;
CompletableFuture<InRangeObjects> newCf = this.fetch(streamId, startOffset, endOffset, limit);
FutureUtil.propagate(newCf, task.cf);
});
}

static class GetObjectsTask {

private final CompletableFuture<InRangeObjects> cf;
private final long streamId;
private final long startOffset;
private final long endOffset;
private final int limit;

public static GetObjectsTask of(long streamId, long startOffset, long endOffset, int limit) {
CompletableFuture<InRangeObjects> cf = new CompletableFuture<>();
return new GetObjectsTask(cf, streamId, startOffset, endOffset, limit);
}
private final CompletableFuture<Void> cf;

private GetObjectsTask(CompletableFuture<InRangeObjects> cf, long streamId, long startOffset, long endOffset, int limit) {
this.cf = cf;
this.streamId = streamId;
this.startOffset = startOffset;
this.endOffset = endOffset;
this.limit = limit;
public GetObjectsTask() {
this.cf = new CompletableFuture<>();
}
}

Expand Down
Loading