From e56b62dfd3ae64325a60f5fe74df0b8dd93eb8e9 Mon Sep 17 00:00:00 2001 From: lifepuzzlefun Date: Tue, 9 Jul 2024 10:27:17 +0800 Subject: [PATCH] fix(s3stream): fix streamsetobject force split fail --- .../kafka/log/stream/s3/objects/ControllerObjectManager.java | 4 +++- .../java/com/automq/stream/s3/compact/CompactionManager.java | 4 +++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/kafka/log/stream/s3/objects/ControllerObjectManager.java b/core/src/main/scala/kafka/log/stream/s3/objects/ControllerObjectManager.java index 720ae66123..fd7810bf91 100644 --- a/core/src/main/scala/kafka/log/stream/s3/objects/ControllerObjectManager.java +++ b/core/src/main/scala/kafka/log/stream/s3/objects/ControllerObjectManager.java @@ -47,6 +47,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static com.automq.stream.s3.metadata.ObjectUtils.NOOP_OBJECT_ID; + public class ControllerObjectManager implements ObjectManager { private final static Logger LOGGER = LoggerFactory.getLogger(ControllerObjectManager.class); @@ -126,7 +128,7 @@ public CompletableFuture commitStreamSetObject0( .map(s -> Convertor.toStreamObjectInRequest(s, version.get())).collect(Collectors.toList())) .setCompactedObjectIds(commitStreamSetObjectRequest.getCompactedObjectIds()) .setFailoverMode(failoverMode); - if (commitStreamSetObjectRequest.getAttributes() == ObjectAttributes.UNSET.attributes()) { + if (commitStreamSetObjectRequest.getObjectId() != NOOP_OBJECT_ID && commitStreamSetObjectRequest.getAttributes() == ObjectAttributes.UNSET.attributes()) { throw new IllegalArgumentException("[BUG]attributes must be set"); } if (version.get().isObjectAttributesSupported()) { diff --git a/s3stream/src/main/java/com/automq/stream/s3/compact/CompactionManager.java b/s3stream/src/main/java/com/automq/stream/s3/compact/CompactionManager.java index 1add245756..ede3ac9bcd 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/compact/CompactionManager.java +++ b/s3stream/src/main/java/com/automq/stream/s3/compact/CompactionManager.java @@ -60,6 +60,8 @@ import org.apache.commons.lang3.tuple.Pair; import org.slf4j.Logger; +import static com.automq.stream.s3.metadata.ObjectUtils.NOOP_OBJECT_ID; + public class CompactionManager { private static final int MIN_COMPACTION_DELAY_MS = 60000; // Max refill rate for Bucket: 1 token per nanosecond @@ -529,7 +531,7 @@ CommitStreamSetObjectRequest buildSplitRequest(List streamMetada } CommitStreamSetObjectRequest request = new CommitStreamSetObjectRequest(); - request.setObjectId(-1L); + request.setObjectId(NOOP_OBJECT_ID); // wait for all force split objects to complete synchronized (this) {