Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -303,11 +303,11 @@ private Optional<StreamOffsetRange> findStreamInStreamSetObject(GetObjectsContex
*/
public List<S3StreamObject> getStreamObjects(long streamId, long startOffset, long endOffset, int limit) {
if (limit <= 0) {
throw new IllegalArgumentException("limit must be positive");
throw new IllegalArgumentException(String.format("limit %d is invalid", limit));
}
S3StreamMetadataImage stream = streamsMetadata.get(streamId);
if (stream == null) {
throw new IllegalArgumentException("stream not found");
throw new IllegalArgumentException(String.format("stream %d not found", streamId));
}
List<S3StreamObject> streamObjectsMetadata = stream.getStreamObjects();
if (streamObjectsMetadata == null || streamObjectsMetadata.isEmpty()) {
Expand Down
17 changes: 11 additions & 6 deletions s3stream/src/main/java/com/automq/stream/s3/S3StreamClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@

public class S3StreamClient implements StreamClient {
private static final Logger LOGGER = LoggerFactory.getLogger(S3StreamClient.class);
private static final long COMPACTION_COOLDOWN_AFTER_OPEN_STREAM = Systems.getEnvLong("AUTOMQ_STREAM_COMPACTION_COOLDOWN_AFTER_OPEN_STREAM", TimeUnit.MINUTES.toMillis(1));
private static final long MINOR_V1_COMPACTION_INTERVAL = Systems.getEnvLong("AUTOMQ_STREAM_COMPACTION_MINOR_V1_INTERVAL", TimeUnit.MINUTES.toMillis(10));
private static final long MAJOR_V1_COMPACTION_INTERVAL = Systems.getEnvLong("AUTOMQ_STREAM_COMPACTION_MAJOR_V1_INTERVAL", TimeUnit.MINUTES.toMillis(60));
/**
Expand Down Expand Up @@ -237,6 +238,7 @@ private <T> T runInLock(Supplier<T> supplier) {

public class StreamWrapper implements Stream {
private final S3Stream stream;
private final long openedTimestamp = System.currentTimeMillis();
private long lastMinorCompactionTimestamp = System.currentTimeMillis();
private long lastMajorCompactionTimestamp = System.currentTimeMillis();
private long lastMinorV1CompactionTimestamp = System.currentTimeMillis();
Expand Down Expand Up @@ -323,16 +325,20 @@ public void compact(CompactionHint hint) {
// so we need to check if the stream is closed before starting the compaction.
return;
}
long now = System.currentTimeMillis();
if (now - openedTimestamp < COMPACTION_COOLDOWN_AFTER_OPEN_STREAM) {
// skip compaction in the first few minutes after the stream is opened
return;
}
if (config.version().isStreamObjectCompactV1Supported()) {
compactV1(hint);
compactV1(hint, now);
} else {
compactV0();
compactV0(now);
}

}

private void compactV0() {
long now = System.currentTimeMillis();
private void compactV0(long now) {
if (now - lastMajorCompactionTimestamp > TimeUnit.MINUTES.toMillis(config.streamObjectCompactionIntervalMinutes())) {
compact(MAJOR);
lastMajorCompactionTimestamp = System.currentTimeMillis();
Expand All @@ -344,8 +350,7 @@ private void compactV0() {
}
}

private void compactV1(CompactionHint hint) {
long now = System.currentTimeMillis();
private void compactV1(CompactionHint hint, long now) {
if (now - lastMajorV1CompactionTimestamp > MAJOR_V1_COMPACTION_INTERVAL || hint.objectsCount >= MAJOR_V1_COMPACTION_MAX_OBJECT_THRESHOLD) {
compact(MAJOR_V1);
lastMajorV1CompactionTimestamp = System.currentTimeMillis();
Expand Down