From 13a9e5db99d202b2e34da51e89ce34929ec37450 Mon Sep 17 00:00:00 2001 From: Robin Han Date: Tue, 23 Apr 2024 10:04:26 +0800 Subject: [PATCH] feat(s3stream): switch to branch main Signed-off-by: Robin Han --- .../main/scala/kafka/log/stream/s3/DefaultS3Client.java | 4 ++-- .../log/stream/s3/failover/DefaultFailoverFactory.java | 5 +++++ .../log/stream/s3/metadata/StreamMetadataManager.java | 9 +++++++++ .../log/stream/s3/objects/ControllerObjectManager.java | 5 +++++ .../scala/kafka/log/streamaspect/cache/FileCache.java | 2 +- gradle/dependencies.gradle | 2 +- 6 files changed, 23 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/kafka/log/stream/s3/DefaultS3Client.java b/core/src/main/scala/kafka/log/stream/s3/DefaultS3Client.java index 4232deb5f4..64592ce6a0 100644 --- a/core/src/main/scala/kafka/log/stream/s3/DefaultS3Client.java +++ b/core/src/main/scala/kafka/log/stream/s3/DefaultS3Client.java @@ -19,8 +19,8 @@ import com.automq.stream.s3.Config; import com.automq.stream.s3.S3Storage; import com.automq.stream.s3.S3StreamClient; -import com.automq.stream.s3.cache.DefaultS3BlockCache; import com.automq.stream.s3.cache.S3BlockCache; +import com.automq.stream.s3.cache.blockcache.StreamReaders; import com.automq.stream.s3.compact.CompactionManager; import com.automq.stream.s3.failover.Failover; import com.automq.stream.s3.failover.FailoverRequest; @@ -95,7 +95,7 @@ public DefaultS3Client(BrokerServer brokerServer, KafkaConfig kafkaConfig) { this.requestSender = new ControllerRequestSender(brokerServer, retryPolicyContext); this.streamManager = new ControllerStreamManager(this.metadataManager, this.requestSender, kafkaConfig); this.objectManager = new ControllerObjectManager(this.requestSender, this.metadataManager, kafkaConfig); - this.blockCache = new DefaultS3BlockCache(this.config, objectManager, s3Operator); + this.blockCache = new StreamReaders(this.config.blockCacheSize(), objectManager, s3Operator); this.compactionManager = new CompactionManager(this.config, this.objectManager, this.streamManager, compactionS3Operator); this.writeAheadLog = BlockWALService.builder(this.config.walPath(), this.config.walCapacity()).config(this.config).build(); this.storage = new S3Storage(this.config, writeAheadLog, streamManager, objectManager, blockCache, s3Operator); diff --git a/core/src/main/scala/kafka/log/stream/s3/failover/DefaultFailoverFactory.java b/core/src/main/scala/kafka/log/stream/s3/failover/DefaultFailoverFactory.java index ab75b385df..4f442b3054 100644 --- a/core/src/main/scala/kafka/log/stream/s3/failover/DefaultFailoverFactory.java +++ b/core/src/main/scala/kafka/log/stream/s3/failover/DefaultFailoverFactory.java @@ -84,6 +84,11 @@ public CompletableFuture> getObjects(long streamId, long return CompletableFuture.failedFuture(new UnsupportedOperationException()); } + @Override + public boolean isObjectExist(long l) { + throw new UnsupportedOperationException(); + } + public CompletableFuture> getServerObjects() { return CompletableFuture.failedFuture(new UnsupportedOperationException()); } diff --git a/core/src/main/scala/kafka/log/stream/s3/metadata/StreamMetadataManager.java b/core/src/main/scala/kafka/log/stream/s3/metadata/StreamMetadataManager.java index 887454ddeb..225ea1b738 100644 --- a/core/src/main/scala/kafka/log/stream/s3/metadata/StreamMetadataManager.java +++ b/core/src/main/scala/kafka/log/stream/s3/metadata/StreamMetadataManager.java @@ -25,6 +25,7 @@ import org.apache.kafka.image.S3StreamsMetadataImage; import org.apache.kafka.metadata.stream.InRangeObjects; import org.apache.kafka.metadata.stream.S3Object; +import org.apache.kafka.metadata.stream.S3ObjectState; import org.apache.kafka.metadata.stream.S3StreamObject; import org.apache.kafka.raft.OffsetAndEpoch; import org.slf4j.Logger; @@ -177,6 +178,14 @@ public long endOffset() { } } + public boolean isObjectExist(long objectId) { + S3Object object = objectsImage.getObjectMetadata(objectId); + if (object == null) { + return false; + } + return object.getS3ObjectState() == S3ObjectState.COMMITTED; + } + // must access thread safe private CompletableFuture pendingFetch() { GetObjectsTask task = new GetObjectsTask(); diff --git a/core/src/main/scala/kafka/log/stream/s3/objects/ControllerObjectManager.java b/core/src/main/scala/kafka/log/stream/s3/objects/ControllerObjectManager.java index b9b6f91df8..c7023bacdc 100644 --- a/core/src/main/scala/kafka/log/stream/s3/objects/ControllerObjectManager.java +++ b/core/src/main/scala/kafka/log/stream/s3/objects/ControllerObjectManager.java @@ -208,6 +208,11 @@ public CompletableFuture> getObjects(long streamId, long }); } + @Override + public boolean isObjectExist(long objectId) { + return this.metadataManager.isObjectExist(objectId); + } + @Override public CompletableFuture> getServerObjects() { try { diff --git a/core/src/main/scala/kafka/log/streamaspect/cache/FileCache.java b/core/src/main/scala/kafka/log/streamaspect/cache/FileCache.java index ce507b1f08..47e3aa0de2 100644 --- a/core/src/main/scala/kafka/log/streamaspect/cache/FileCache.java +++ b/core/src/main/scala/kafka/log/streamaspect/cache/FileCache.java @@ -143,7 +143,7 @@ public Optional get(String filePath, long position, int length) { if (entry.getKey() + entry.getValue().dataLength < position + length) { return Optional.empty(); } - lru.touch(new Key(filePath, cacheStartPosition)); + lru.touchIfExist(new Key(filePath, cacheStartPosition)); MappedByteBuffer cacheByteBuffer = this.cacheByteBuffer.duplicate(); long nextPosition = position; int remaining = length; diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle index d1267b4370..6480e1dc92 100644 --- a/gradle/dependencies.gradle +++ b/gradle/dependencies.gradle @@ -250,6 +250,6 @@ extLibs += [ ] branchVersions += [ s3stream: { - require "1.0.4-s3stream" + branch = "main" } ]