Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions core/src/main/scala/kafka/log/stream/s3/DefaultS3Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
import com.automq.stream.s3.Config;
import com.automq.stream.s3.S3Storage;
import com.automq.stream.s3.S3StreamClient;
import com.automq.stream.s3.cache.DefaultS3BlockCache;
import com.automq.stream.s3.cache.S3BlockCache;
import com.automq.stream.s3.cache.blockcache.StreamReaders;
import com.automq.stream.s3.compact.CompactionManager;
import com.automq.stream.s3.failover.Failover;
import com.automq.stream.s3.failover.FailoverRequest;
Expand Down Expand Up @@ -95,7 +95,7 @@ public DefaultS3Client(BrokerServer brokerServer, KafkaConfig kafkaConfig) {
this.requestSender = new ControllerRequestSender(brokerServer, retryPolicyContext);
this.streamManager = new ControllerStreamManager(this.metadataManager, this.requestSender, kafkaConfig);
this.objectManager = new ControllerObjectManager(this.requestSender, this.metadataManager, kafkaConfig);
this.blockCache = new DefaultS3BlockCache(this.config, objectManager, s3Operator);
this.blockCache = new StreamReaders(this.config.blockCacheSize(), objectManager, s3Operator);
this.compactionManager = new CompactionManager(this.config, this.objectManager, this.streamManager, compactionS3Operator);
this.writeAheadLog = BlockWALService.builder(this.config.walPath(), this.config.walCapacity()).config(this.config).build();
this.storage = new S3Storage(this.config, writeAheadLog, streamManager, objectManager, blockCache, s3Operator);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,11 @@ public CompletableFuture<List<S3ObjectMetadata>> getObjects(long streamId, long
return CompletableFuture.failedFuture(new UnsupportedOperationException());
}

@Override
public boolean isObjectExist(long l) {
throw new UnsupportedOperationException();
}

public CompletableFuture<List<S3ObjectMetadata>> getServerObjects() {
return CompletableFuture.failedFuture(new UnsupportedOperationException());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.kafka.image.S3StreamsMetadataImage;
import org.apache.kafka.metadata.stream.InRangeObjects;
import org.apache.kafka.metadata.stream.S3Object;
import org.apache.kafka.metadata.stream.S3ObjectState;
import org.apache.kafka.metadata.stream.S3StreamObject;
import org.apache.kafka.raft.OffsetAndEpoch;
import org.slf4j.Logger;
Expand Down Expand Up @@ -177,6 +178,14 @@ public long endOffset() {
}
}

public boolean isObjectExist(long objectId) {
S3Object object = objectsImage.getObjectMetadata(objectId);
if (object == null) {
return false;
}
return object.getS3ObjectState() == S3ObjectState.COMMITTED;
}

// must access thread safe
private CompletableFuture<Void> pendingFetch() {
GetObjectsTask task = new GetObjectsTask();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,11 @@ public CompletableFuture<List<S3ObjectMetadata>> getObjects(long streamId, long
});
}

@Override
public boolean isObjectExist(long objectId) {
return this.metadataManager.isObjectExist(objectId);
}

@Override
public CompletableFuture<List<S3ObjectMetadata>> getServerObjects() {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ public Optional<ByteBuf> get(String filePath, long position, int length) {
if (entry.getKey() + entry.getValue().dataLength < position + length) {
return Optional.empty();
}
lru.touch(new Key(filePath, cacheStartPosition));
lru.touchIfExist(new Key(filePath, cacheStartPosition));
MappedByteBuffer cacheByteBuffer = this.cacheByteBuffer.duplicate();
long nextPosition = position;
int remaining = length;
Expand Down
2 changes: 1 addition & 1 deletion gradle/dependencies.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,6 @@ extLibs += [
]
branchVersions += [
s3stream: {
require "1.0.4-s3stream"
branch = "main"
}
]