diff --git a/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataImage.java b/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataImage.java index 2990e07aa6..6bc5690eea 100644 --- a/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataImage.java +++ b/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataImage.java @@ -303,11 +303,11 @@ private Optional findStreamInStreamSetObject(GetObjectsContex */ public List getStreamObjects(long streamId, long startOffset, long endOffset, int limit) { if (limit <= 0) { - throw new IllegalArgumentException("limit must be positive"); + throw new IllegalArgumentException(String.format("limit %d is invalid", limit)); } S3StreamMetadataImage stream = streamsMetadata.get(streamId); if (stream == null) { - throw new IllegalArgumentException("stream not found"); + throw new IllegalArgumentException(String.format("stream %d not found", streamId)); } List streamObjectsMetadata = stream.getStreamObjects(); if (streamObjectsMetadata == null || streamObjectsMetadata.isEmpty()) { diff --git a/s3stream/src/main/java/com/automq/stream/s3/S3StreamClient.java b/s3stream/src/main/java/com/automq/stream/s3/S3StreamClient.java index 5e0160112c..9b88f10569 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/S3StreamClient.java +++ b/s3stream/src/main/java/com/automq/stream/s3/S3StreamClient.java @@ -55,6 +55,7 @@ public class S3StreamClient implements StreamClient { private static final Logger LOGGER = LoggerFactory.getLogger(S3StreamClient.class); + private static final long COMPACTION_COOLDOWN_AFTER_OPEN_STREAM = Systems.getEnvLong("AUTOMQ_STREAM_COMPACTION_COOLDOWN_AFTER_OPEN_STREAM", TimeUnit.MINUTES.toMillis(1)); private static final long MINOR_V1_COMPACTION_INTERVAL = Systems.getEnvLong("AUTOMQ_STREAM_COMPACTION_MINOR_V1_INTERVAL", TimeUnit.MINUTES.toMillis(10)); private static final long MAJOR_V1_COMPACTION_INTERVAL = Systems.getEnvLong("AUTOMQ_STREAM_COMPACTION_MAJOR_V1_INTERVAL", TimeUnit.MINUTES.toMillis(60)); /** @@ -237,6 +238,7 @@ private T runInLock(Supplier supplier) { public class StreamWrapper implements Stream { private final S3Stream stream; + private final long openedTimestamp = System.currentTimeMillis(); private long lastMinorCompactionTimestamp = System.currentTimeMillis(); private long lastMajorCompactionTimestamp = System.currentTimeMillis(); private long lastMinorV1CompactionTimestamp = System.currentTimeMillis(); @@ -323,16 +325,20 @@ public void compact(CompactionHint hint) { // so we need to check if the stream is closed before starting the compaction. return; } + long now = System.currentTimeMillis(); + if (now - openedTimestamp < COMPACTION_COOLDOWN_AFTER_OPEN_STREAM) { + // skip compaction in the first few minutes after the stream is opened + return; + } if (config.version().isStreamObjectCompactV1Supported()) { - compactV1(hint); + compactV1(hint, now); } else { - compactV0(); + compactV0(now); } } - private void compactV0() { - long now = System.currentTimeMillis(); + private void compactV0(long now) { if (now - lastMajorCompactionTimestamp > TimeUnit.MINUTES.toMillis(config.streamObjectCompactionIntervalMinutes())) { compact(MAJOR); lastMajorCompactionTimestamp = System.currentTimeMillis(); @@ -344,8 +350,7 @@ private void compactV0() { } } - private void compactV1(CompactionHint hint) { - long now = System.currentTimeMillis(); + private void compactV1(CompactionHint hint, long now) { if (now - lastMajorV1CompactionTimestamp > MAJOR_V1_COMPACTION_INTERVAL || hint.objectsCount >= MAJOR_V1_COMPACTION_MAX_OBJECT_THRESHOLD) { compact(MAJOR_V1); lastMajorV1CompactionTimestamp = System.currentTimeMillis();