From ebd5e51c1e988eb8519e899759cea6b8f2da3c63 Mon Sep 17 00:00:00 2001 From: Shichao Nie Date: Fri, 8 Nov 2024 11:36:00 +0800 Subject: [PATCH] fix(issue2139): prevent read object info from closed ObjectReader Signed-off-by: Shichao Nie --- .../automq/stream/s3/CompositeObjectReader.java | 17 ++++++++++++++--- .../java/com/automq/stream/s3/ObjectReader.java | 8 ++++++++ 2 files changed, 22 insertions(+), 3 deletions(-) diff --git a/s3stream/src/main/java/com/automq/stream/s3/CompositeObjectReader.java b/s3stream/src/main/java/com/automq/stream/s3/CompositeObjectReader.java index 90cc761c72..c833799c1c 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/CompositeObjectReader.java +++ b/s3stream/src/main/java/com/automq/stream/s3/CompositeObjectReader.java @@ -16,15 +16,19 @@ import com.automq.stream.s3.objects.ObjectAttributes; import com.automq.stream.utils.biniarysearch.AbstractOrderedCollection; import com.automq.stream.utils.biniarysearch.ComparableItem; -import io.netty.buffer.ByteBuf; + +import java.util.concurrent.atomic.AtomicBoolean; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Objects; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicInteger; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; + +import io.netty.buffer.ByteBuf; import static com.automq.stream.s3.ByteBufAlloc.BLOCK_CACHE; import static com.automq.stream.s3.CompositeObject.FOOTER_MAGIC; @@ -41,6 +45,7 @@ public class CompositeObjectReader implements ObjectReader { private CompletableFuture basicObjectInfoCf; private CompletableFuture sizeCf; private final AtomicInteger refCount = new AtomicInteger(1); + private final AtomicBoolean isShutdown = new AtomicBoolean(false); public CompositeObjectReader(S3ObjectMetadata objectMetadata, RangeReader rangeReader) { this.objectMetadata = objectMetadata; @@ -59,6 +64,9 @@ public String objectKey() { @Override public synchronized CompletableFuture basicObjectInfo() { + if (isShutdown.get()) { + return CompletableFuture.failedFuture(new IllegalStateException("ObjectReader is already shutdown")); + } if (basicObjectInfoCf == null) { this.basicObjectInfoCf = new CompletableFuture<>(); this.basicObjectInfoCf.exceptionally(ex -> { @@ -101,6 +109,9 @@ public synchronized CompletableFuture size() { } public synchronized void close0() { + if (!isShutdown.compareAndSet(false, true)) { + return; + } if (basicObjectInfoCf != null) { basicObjectInfoCf.thenAccept(BasicObjectInfo::close); } diff --git a/s3stream/src/main/java/com/automq/stream/s3/ObjectReader.java b/s3stream/src/main/java/com/automq/stream/s3/ObjectReader.java index ad51d5bf59..1008b82658 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/ObjectReader.java +++ b/s3stream/src/main/java/com/automq/stream/s3/ObjectReader.java @@ -30,6 +30,7 @@ import java.util.Objects; import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -89,6 +90,7 @@ class DefaultObjectReader implements ObjectReader { private CompletableFuture basicObjectInfoCf; private CompletableFuture sizeCf; private final AtomicInteger refCount = new AtomicInteger(1); + private final AtomicBoolean isShutdown = new AtomicBoolean(false); public DefaultObjectReader(S3ObjectMetadata metadata, ObjectStorage objectStorage) { this.metadata = metadata; @@ -105,6 +107,9 @@ public String objectKey() { } public synchronized CompletableFuture basicObjectInfo() { + if (isShutdown.get()) { + return CompletableFuture.failedFuture(new IllegalStateException("ObjectReader is already shutdown")); + } if (basicObjectInfoCf == null) { this.basicObjectInfoCf = new CompletableFuture<>(); asyncGetBasicObjectInfo(); @@ -186,6 +191,9 @@ public synchronized CompletableFuture size() { } public synchronized void close0() { + if (!isShutdown.compareAndSet(false, true)) { + return; + } if (basicObjectInfoCf != null) { basicObjectInfoCf.thenAccept(BasicObjectInfo::close); }