diff --git a/core/src/main/scala/kafka/log/stream/s3/objects/ControllerObjectManager.java b/core/src/main/scala/kafka/log/stream/s3/objects/ControllerObjectManager.java index 4462255694..b9b6f91df8 100644 --- a/core/src/main/scala/kafka/log/stream/s3/objects/ControllerObjectManager.java +++ b/core/src/main/scala/kafka/log/stream/s3/objects/ControllerObjectManager.java @@ -187,7 +187,7 @@ public Builder toRequestBuilder() { throw code.exception(); case STREAM_NOT_EXIST: case STREAM_FENCED: - LOGGER.error("Stream fenced or not exist: {}, code: {}", request, Errors.forCode(resp.errorCode())); + LOGGER.warn("Stream fenced or not exist: {}, code: {}", request, Errors.forCode(resp.errorCode())); throw Errors.forCode(resp.errorCode()).exception(); default: LOGGER.error("Error while committing stream object: {}, code: {}, retry later", request, code); diff --git a/core/src/main/scala/kafka/log/streamaspect/MetaStream.java b/core/src/main/scala/kafka/log/streamaspect/MetaStream.java index 660b118d35..8e3d664bcd 100644 --- a/core/src/main/scala/kafka/log/streamaspect/MetaStream.java +++ b/core/src/main/scala/kafka/log/streamaspect/MetaStream.java @@ -283,7 +283,7 @@ private synchronized CompletableFuture doCompaction(boolean force) { return CompletableFuture.completedFuture(null); } double hollowRate = 1 - (double) metaCache.size() / size; - if (!force && hollowRate >= COMPACTION_HOLLOW_RATE) { + if (!force && hollowRate < COMPACTION_HOLLOW_RATE) { return CompletableFuture.completedFuture(null); } MetadataValue last = null; @@ -307,10 +307,7 @@ private synchronized CompletableFuture doCompaction(boolean force) { CompletableFuture overwriteCf = CompletableFuture.allOf(overwrite.stream().map(this::append).toArray(CompletableFuture[]::new)); return overwriteCf.thenAccept(nil -> { OptionalLong minOffset = metaCache.values().stream().mapToLong(v -> v.offset).min(); - minOffset.ifPresent(offset -> { - trim(offset); - LOGGER.info("compact streamId={} done, compact from [{}, {}) to [{}, {})", streamId(), startOffset, endOffset, offset, nextOffset()); - }); + minOffset.ifPresent(this::trim); }); } diff --git a/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java index f53ae12655..31eb27723b 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java @@ -365,7 +365,6 @@ public ControllerResult closeStream(int nodeId, long nodeEp } public ControllerResult trimStream(int nodeId, long nodeEpoch, TrimStreamRequest request) { - // TODO: speed up offset updating long epoch = request.streamEpoch(); long streamId = request.streamId(); long newStartOffset = request.newStartOffset(); @@ -450,30 +449,6 @@ public ControllerResult trimStream(int nodeId, long nodeEpoc .setEpoch(range.epoch()) .setRangeIndex(rangeIndex), (short) 0)); }); - // remove stream object - streamMetadata.streamObjects().entrySet().stream().forEach(it -> { - Long objectId = it.getKey(); - S3StreamObject streamObject = it.getValue(); - long streamStartOffset = streamObject.streamOffsetRange().startOffset(); - long streamEndOffset = streamObject.streamOffsetRange().endOffset(); - if (newStartOffset <= streamStartOffset) { - return; - } - if (newStartOffset >= streamEndOffset) { - // remove stream object - records.add(new ApiMessageAndVersion(new RemoveS3StreamObjectRecord() - .setStreamId(streamId) - .setObjectId(objectId), (short) 0)); - ControllerResult markDestroyResult = this.s3ObjectControlManager.markDestroyObjects( - Collections.singletonList(objectId)); - if (!markDestroyResult.response()) { - log.error("[TrimStream] Mark destroy stream object: {} failed", objectId); - resp.setErrorCode(Errors.STREAM_INNER_ERROR.code()); - return; - } - records.addAll(markDestroyResult.records()); - } - }); if (resp.errorCode() != Errors.NONE.code()) { return ControllerResult.of(Collections.emptyList(), resp); } diff --git a/metadata/src/test/java/org/apache/kafka/controller/StreamControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/StreamControlManagerTest.java index ba9bdbad78..6b9daa9b1f 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/StreamControlManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/StreamControlManagerTest.java @@ -850,7 +850,7 @@ public void testTrim() { assertEquals(1, rangeMetadata.rangeIndex()); assertEquals(60, rangeMetadata.startOffset()); assertEquals(70, rangeMetadata.endOffset()); - assertEquals(0, streamMetadata.streamObjects().size()); + assertEquals(1, streamMetadata.streamObjects().size()); // 3. trim stream0 to [100, ..) trimRequest = new TrimStreamRequest() @@ -869,7 +869,7 @@ public void testTrim() { assertEquals(1, rangeMetadata.rangeIndex()); assertEquals(70, rangeMetadata.startOffset()); assertEquals(70, rangeMetadata.endOffset()); - assertEquals(0, streamMetadata.streamObjects().size()); + assertEquals(1, streamMetadata.streamObjects().size()); // 5. commit stream set object with stream0-[70, 100) CommitStreamSetObjectRequestData requestData = new CommitStreamSetObjectRequestData()