diff --git a/s3stream/src/main/java/com/automq/stream/s3/CompositeObjectReader.java b/s3stream/src/main/java/com/automq/stream/s3/CompositeObjectReader.java index 90cc761c72..c833799c1c 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/CompositeObjectReader.java +++ b/s3stream/src/main/java/com/automq/stream/s3/CompositeObjectReader.java @@ -16,15 +16,19 @@ import com.automq.stream.s3.objects.ObjectAttributes; import com.automq.stream.utils.biniarysearch.AbstractOrderedCollection; import com.automq.stream.utils.biniarysearch.ComparableItem; -import io.netty.buffer.ByteBuf; + +import java.util.concurrent.atomic.AtomicBoolean; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Objects; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicInteger; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; + +import io.netty.buffer.ByteBuf; import static com.automq.stream.s3.ByteBufAlloc.BLOCK_CACHE; import static com.automq.stream.s3.CompositeObject.FOOTER_MAGIC; @@ -41,6 +45,7 @@ public class CompositeObjectReader implements ObjectReader { private CompletableFuture basicObjectInfoCf; private CompletableFuture sizeCf; private final AtomicInteger refCount = new AtomicInteger(1); + private final AtomicBoolean isShutdown = new AtomicBoolean(false); public CompositeObjectReader(S3ObjectMetadata objectMetadata, RangeReader rangeReader) { this.objectMetadata = objectMetadata; @@ -59,6 +64,9 @@ public String objectKey() { @Override public synchronized CompletableFuture basicObjectInfo() { + if (isShutdown.get()) { + return CompletableFuture.failedFuture(new IllegalStateException("ObjectReader is already shutdown")); + } if (basicObjectInfoCf == null) { this.basicObjectInfoCf = new CompletableFuture<>(); this.basicObjectInfoCf.exceptionally(ex -> { @@ -101,6 +109,9 @@ public synchronized CompletableFuture size() { } public synchronized void close0() { + if (!isShutdown.compareAndSet(false, true)) { + return; + } if (basicObjectInfoCf != null) { basicObjectInfoCf.thenAccept(BasicObjectInfo::close); } diff --git a/s3stream/src/main/java/com/automq/stream/s3/ObjectReader.java b/s3stream/src/main/java/com/automq/stream/s3/ObjectReader.java index ad51d5bf59..1008b82658 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/ObjectReader.java +++ b/s3stream/src/main/java/com/automq/stream/s3/ObjectReader.java @@ -30,6 +30,7 @@ import java.util.Objects; import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -89,6 +90,7 @@ class DefaultObjectReader implements ObjectReader { private CompletableFuture basicObjectInfoCf; private CompletableFuture sizeCf; private final AtomicInteger refCount = new AtomicInteger(1); + private final AtomicBoolean isShutdown = new AtomicBoolean(false); public DefaultObjectReader(S3ObjectMetadata metadata, ObjectStorage objectStorage) { this.metadata = metadata; @@ -105,6 +107,9 @@ public String objectKey() { } public synchronized CompletableFuture basicObjectInfo() { + if (isShutdown.get()) { + return CompletableFuture.failedFuture(new IllegalStateException("ObjectReader is already shutdown")); + } if (basicObjectInfoCf == null) { this.basicObjectInfoCf = new CompletableFuture<>(); asyncGetBasicObjectInfo(); @@ -186,6 +191,9 @@ public synchronized CompletableFuture size() { } public synchronized void close0() { + if (!isShutdown.compareAndSet(false, true)) { + return; + } if (basicObjectInfoCf != null) { basicObjectInfoCf.thenAccept(BasicObjectInfo::close); }