Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ public Builder toRequestBuilder() {
throw code.exception();
case STREAM_NOT_EXIST:
case STREAM_FENCED:
LOGGER.error("Stream fenced or not exist: {}, code: {}", request, Errors.forCode(resp.errorCode()));
LOGGER.warn("Stream fenced or not exist: {}, code: {}", request, Errors.forCode(resp.errorCode()));
throw Errors.forCode(resp.errorCode()).exception();
default:
LOGGER.error("Error while committing stream object: {}, code: {}, retry later", request, code);
Expand Down
7 changes: 2 additions & 5 deletions core/src/main/scala/kafka/log/streamaspect/MetaStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ private synchronized CompletableFuture<Void> doCompaction(boolean force) {
return CompletableFuture.completedFuture(null);
}
double hollowRate = 1 - (double) metaCache.size() / size;
if (!force && hollowRate >= COMPACTION_HOLLOW_RATE) {
if (!force && hollowRate < COMPACTION_HOLLOW_RATE) {
return CompletableFuture.completedFuture(null);
}
MetadataValue last = null;
Expand All @@ -307,10 +307,7 @@ private synchronized CompletableFuture<Void> doCompaction(boolean force) {
CompletableFuture<Void> overwriteCf = CompletableFuture.allOf(overwrite.stream().map(this::append).toArray(CompletableFuture[]::new));
return overwriteCf.thenAccept(nil -> {
OptionalLong minOffset = metaCache.values().stream().mapToLong(v -> v.offset).min();
minOffset.ifPresent(offset -> {
trim(offset);
LOGGER.info("compact streamId={} done, compact from [{}, {}) to [{}, {})", streamId(), startOffset, endOffset, offset, nextOffset());
});
minOffset.ifPresent(this::trim);
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,6 @@ public ControllerResult<CloseStreamResponse> closeStream(int nodeId, long nodeEp
}

public ControllerResult<TrimStreamResponse> trimStream(int nodeId, long nodeEpoch, TrimStreamRequest request) {
// TODO: speed up offset updating
long epoch = request.streamEpoch();
long streamId = request.streamId();
long newStartOffset = request.newStartOffset();
Expand Down Expand Up @@ -450,30 +449,6 @@ public ControllerResult<TrimStreamResponse> trimStream(int nodeId, long nodeEpoc
.setEpoch(range.epoch())
.setRangeIndex(rangeIndex), (short) 0));
});
// remove stream object
streamMetadata.streamObjects().entrySet().stream().forEach(it -> {
Long objectId = it.getKey();
S3StreamObject streamObject = it.getValue();
long streamStartOffset = streamObject.streamOffsetRange().startOffset();
long streamEndOffset = streamObject.streamOffsetRange().endOffset();
if (newStartOffset <= streamStartOffset) {
return;
}
if (newStartOffset >= streamEndOffset) {
// remove stream object
records.add(new ApiMessageAndVersion(new RemoveS3StreamObjectRecord()
.setStreamId(streamId)
.setObjectId(objectId), (short) 0));
ControllerResult<Boolean> markDestroyResult = this.s3ObjectControlManager.markDestroyObjects(
Collections.singletonList(objectId));
if (!markDestroyResult.response()) {
log.error("[TrimStream] Mark destroy stream object: {} failed", objectId);
resp.setErrorCode(Errors.STREAM_INNER_ERROR.code());
return;
}
records.addAll(markDestroyResult.records());
}
});
if (resp.errorCode() != Errors.NONE.code()) {
return ControllerResult.of(Collections.emptyList(), resp);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -850,7 +850,7 @@ public void testTrim() {
assertEquals(1, rangeMetadata.rangeIndex());
assertEquals(60, rangeMetadata.startOffset());
assertEquals(70, rangeMetadata.endOffset());
assertEquals(0, streamMetadata.streamObjects().size());
assertEquals(1, streamMetadata.streamObjects().size());

// 3. trim stream0 to [100, ..)
trimRequest = new TrimStreamRequest()
Expand All @@ -869,7 +869,7 @@ public void testTrim() {
assertEquals(1, rangeMetadata.rangeIndex());
assertEquals(70, rangeMetadata.startOffset());
assertEquals(70, rangeMetadata.endOffset());
assertEquals(0, streamMetadata.streamObjects().size());
assertEquals(1, streamMetadata.streamObjects().size());

// 5. commit stream set object with stream0-[70, 100)
CommitStreamSetObjectRequestData requestData = new CommitStreamSetObjectRequestData()
Expand Down