diff --git a/s3stream/src/main/java/com/automq/stream/s3/CompositeObjectReader.java b/s3stream/src/main/java/com/automq/stream/s3/CompositeObjectReader.java index c833799c1c..0a527ced10 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/CompositeObjectReader.java +++ b/s3stream/src/main/java/com/automq/stream/s3/CompositeObjectReader.java @@ -17,7 +17,6 @@ import com.automq.stream.utils.biniarysearch.AbstractOrderedCollection; import com.automq.stream.utils.biniarysearch.ComparableItem; -import java.util.concurrent.atomic.AtomicBoolean; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -26,6 +25,7 @@ import java.util.List; import java.util.Objects; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import io.netty.buffer.ByteBuf; diff --git a/s3stream/src/main/java/com/automq/stream/s3/ObjectReader.java b/s3stream/src/main/java/com/automq/stream/s3/ObjectReader.java index 1008b82658..d363401725 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/ObjectReader.java +++ b/s3stream/src/main/java/com/automq/stream/s3/ObjectReader.java @@ -21,7 +21,10 @@ import com.automq.stream.s3.operator.ObjectStorage.ReadOptions; import com.automq.stream.utils.CloseableIterator; import com.automq.stream.utils.biniarysearch.IndexBlockOrderedBytes; -import io.netty.buffer.ByteBuf; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.util.ArrayList; import java.util.Iterator; import java.util.LinkedList; @@ -32,8 +35,8 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; + +import io.netty.buffer.ByteBuf; import static com.automq.stream.s3.ByteBufAlloc.BLOCK_CACHE; import static com.automq.stream.s3.ByteBufAlloc.READ_INDEX_BLOCK; diff --git a/s3stream/src/main/java/com/automq/stream/s3/cache/AsyncLRUCache.java b/s3stream/src/main/java/com/automq/stream/s3/cache/AsyncLRUCache.java index 3908ffcf16..0201d0f0d9 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/cache/AsyncLRUCache.java +++ b/s3stream/src/main/java/com/automq/stream/s3/cache/AsyncLRUCache.java @@ -11,14 +11,14 @@ package com.automq.stream.s3.cache; +import com.automq.stream.s3.metrics.S3StreamMetricsManager; +import com.automq.stream.s3.metrics.stats.AsyncLRUCacheStats; import java.util.HashSet; import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.concurrent.atomic.AtomicLong; - -import com.automq.stream.s3.metrics.S3StreamMetricsManager; -import com.automq.stream.s3.metrics.stats.AsyncLRUCacheStats; +import java.util.function.Function; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -87,6 +87,20 @@ public synchronized V get(K key) { return val; } + public synchronized V computeIfAbsent(K key, Function valueMapper) { + V value = cache.get(key); + if (value == null) { + value = valueMapper.apply(key); + if (value != null) { + put(key, value); + } + } + return value; + } + + public synchronized void inLockRun(Runnable runnable) { + runnable.run(); + } public synchronized boolean remove(K key) { V value = cache.get(key); diff --git a/s3stream/src/main/java/com/automq/stream/s3/cache/blockcache/DefaultObjectReaderFactory.java b/s3stream/src/main/java/com/automq/stream/s3/cache/blockcache/DefaultObjectReaderFactory.java index 4ffaf7643a..20b56e81f9 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/cache/blockcache/DefaultObjectReaderFactory.java +++ b/s3stream/src/main/java/com/automq/stream/s3/cache/blockcache/DefaultObjectReaderFactory.java @@ -15,6 +15,7 @@ import com.automq.stream.s3.cache.ObjectReaderLRUCache; import com.automq.stream.s3.metadata.S3ObjectMetadata; import com.automq.stream.s3.operator.ObjectStorage; +import java.util.concurrent.atomic.AtomicReference; public class DefaultObjectReaderFactory implements ObjectReaderFactory { private static final int MAX_OBJECT_READER_SIZE = 100 * 1024 * 1024; // 100MB; @@ -29,12 +30,13 @@ public DefaultObjectReaderFactory(ObjectStorage objectStorage) { @Override public synchronized ObjectReader get(S3ObjectMetadata metadata) { - ObjectReader objectReader = objectReaders.get(metadata.objectId()); - if (objectReader == null) { - objectReader = ObjectReader.reader(metadata, objectStorage); - objectReaders.put(metadata.objectId(), objectReader); - } - return objectReader.retain(); + AtomicReference objectReaderRef = new AtomicReference<>(); + objectReaders.inLockRun(() -> { + ObjectReader objectReader = objectReaders.computeIfAbsent(metadata.objectId(), k -> ObjectReader.reader(metadata, objectStorage)); + objectReader.retain(); + objectReaderRef.set(objectReader); + }); + return objectReaderRef.get(); } @Override