Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@ public ControllerResult<CommitWALObjectResponseData> commitWALObject(CommitWALOb
// commit object
ControllerResult<Boolean> commitResult = this.s3ObjectControlManager.commitObject(objectId, objectSize);
if (!commitResult.response()) {
log.error("object {} not exist when commit wal object", objectId);
log.error("[CommitWALObject]: object {} not exist when commit wal object", objectId);
resp.setErrorCode(Errors.OBJECT_NOT_EXIST.code());
return ControllerResult.of(Collections.emptyList(), resp);
}
Expand All @@ -374,7 +374,7 @@ public ControllerResult<CommitWALObjectResponseData> commitWALObject(CommitWALOb
if (data.compactedObjectIds() != null && !data.compactedObjectIds().isEmpty()) {
ControllerResult<Boolean> destroyResult = this.s3ObjectControlManager.markDestroyObjects(data.compactedObjectIds());
if (!destroyResult.response()) {
log.error("Mark destroy compacted objects {} failed", String.join(",", data.compactedObjectIds().toArray(new String[0])));
log.error("[CommitWALObject]: Mark destroy compacted objects: {} failed", data.compactedObjectIds());
resp.setErrorCode(Errors.STREAM_INNER_ERROR.code());
return ControllerResult.of(Collections.emptyList(), resp);
}
Expand Down Expand Up @@ -411,12 +411,54 @@ public ControllerResult<CommitWALObjectResponseData> commitWALObject(CommitWALOb
// generate compacted objects' remove record
data.compactedObjectIds().forEach(id -> records.add(new ApiMessageAndVersion(new RemoveWALObjectRecord()
.setObjectId(id), (short) 0)));
log.info("[CommitWALObject]: broker: {} commit wal object {} success", brokerId, objectId);
log.info("[CommitWALObject]: broker: {} commit wal object: {} success, compacted objects: {}", brokerId, objectId, data.compactedObjectIds());
return ControllerResult.atomicOf(records, resp);
}

public ControllerResult<CommitStreamObjectResponseData> commitStreamObject(CommitStreamObjectRequestData data) {
throw new UnsupportedOperationException();
long streamObjectId = data.objectId();
long streamId = data.streamId();
long startOffset = data.startOffset();
long endOffset = data.endOffset();
long objectSize = data.objectSize();
List<Long> sourceObjectIds = data.sourceObjectIds();
List<ApiMessageAndVersion> records = new ArrayList<>();
CommitStreamObjectResponseData resp = new CommitStreamObjectResponseData();

// commit object
ControllerResult<Boolean> commitResult = this.s3ObjectControlManager.commitObject(streamObjectId, objectSize);
if (!commitResult.response()) {
log.error("[CommitStreamObject]: object {} not exist when commit stream object", streamObjectId);
resp.setErrorCode(Errors.OBJECT_NOT_EXIST.code());
return ControllerResult.of(Collections.emptyList(), resp);
}

// mark destroy compacted object
if (sourceObjectIds != null && !sourceObjectIds.isEmpty()) {
ControllerResult<Boolean> destroyResult = this.s3ObjectControlManager.markDestroyObjects(sourceObjectIds);
if (!destroyResult.response()) {
log.error("[CommitStreamObject]: Mark destroy compacted objects: {} failed", sourceObjectIds);
resp.setErrorCode(Errors.STREAM_INNER_ERROR.code());
return ControllerResult.of(Collections.emptyList(), resp);
}
}

// generate stream object record
records.add(new ApiMessageAndVersion(new S3StreamObjectRecord()
.setObjectId(streamObjectId)
.setStreamId(streamId)
.setObjectSize(objectSize)
.setStartOffset(startOffset)
.setEndOffset(endOffset), (short) 0));

// generate compacted objects' remove record
if (sourceObjectIds != null && !sourceObjectIds.isEmpty()) {
sourceObjectIds.forEach(id -> records.add(new ApiMessageAndVersion(new RemoveS3StreamObjectRecord()
.setObjectId(id)
.setStreamId(streamId), (short) 0)));
}
log.info("[CommitStreamObject]: stream object: {} commit success, compacted objects: {}", streamObjectId, sourceObjectIds);
return ControllerResult.atomicOf(records, resp);
}

public GetStreamsOffsetResponseData getStreamsOffset(GetStreamsOffsetRequestData data) {
Expand Down Expand Up @@ -541,6 +583,7 @@ public void replay(RemoveWALObjectRecord record) {
}
walMetadata.walObjects.remove(objectId);
}

public void replay(S3StreamObjectRecord record) {
long objectId = record.objectId();
long streamId = record.streamId();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import java.util.stream.Collectors;
import org.apache.kafka.common.message.CloseStreamRequestData;
import org.apache.kafka.common.message.CloseStreamResponseData;
import org.apache.kafka.common.message.CommitStreamObjectRequestData;
import org.apache.kafka.common.message.CommitStreamObjectResponseData;
import org.apache.kafka.common.message.CommitWALObjectRequestData;
import org.apache.kafka.common.message.CommitWALObjectRequestData.ObjectStreamRange;
import org.apache.kafka.common.message.CommitWALObjectRequestData.StreamObject;
Expand Down Expand Up @@ -536,6 +538,96 @@ public void testCommitWalWithStreamObject() {
assertEquals(1, manager.streamsMetadata().get(STREAM1).streamObjects().size());
}

@Test
public void testCommitStreamObject() {
Mockito.when(objectControlManager.commitObject(anyLong(), anyLong())).thenReturn(ControllerResult.of(Collections.emptyList(), true));
Mockito.when(objectControlManager.markDestroyObjects(anyList())).thenReturn(ControllerResult.of(Collections.emptyList(), true));

// 1. create and open stream_0 and stream_1
createAndOpenStream(BROKER0, EPOCH0);
createAndOpenStream(BROKER0, EPOCH0);

// 2. commit a wal with stream_0 and a stream object with stream_1 that is split out from wal
List<ObjectStreamRange> streamRanges0 = List.of(
new ObjectStreamRange()
.setStreamId(STREAM0)
.setStreamEpoch(EPOCH0)
.setStartOffset(0L)
.setEndOffset(100L));
CommitWALObjectRequestData commitRequest0 = new CommitWALObjectRequestData()
.setObjectId(0L)
.setOrderId(0L)
.setBrokerId(BROKER0)
.setObjectSize(999)
.setObjectStreamRanges(streamRanges0)
.setStreamObjects(List.of(
new StreamObject()
.setStreamId(STREAM1)
.setObjectId(1L)
.setObjectSize(999)
.setStartOffset(0L)
.setEndOffset(200L)
));
ControllerResult<CommitWALObjectResponseData> result0 = manager.commitWALObject(commitRequest0);
assertEquals(Errors.NONE.code(), result0.response().errorCode());
replay(manager, result0.records());

// 3. commit a wal with stream_0 and a stream object with stream_1 that is split out from wal
List<ObjectStreamRange> streamRanges1 = List.of(
new ObjectStreamRange()
.setStreamId(STREAM0)
.setStreamEpoch(EPOCH0)
.setStartOffset(100L)
.setEndOffset(200L));
CommitWALObjectRequestData commitRequest1 = new CommitWALObjectRequestData()
.setObjectId(2L)
.setOrderId(1L)
.setBrokerId(BROKER0)
.setObjectSize(999)
.setObjectStreamRanges(streamRanges1)
.setStreamObjects(List.of(
new StreamObject()
.setStreamId(STREAM1)
.setObjectId(3L)
.setObjectSize(999)
.setStartOffset(200L)
.setEndOffset(400L)
));
ControllerResult<CommitWALObjectResponseData> result1 = manager.commitWALObject(commitRequest1);
assertEquals(Errors.NONE.code(), result1.response().errorCode());
replay(manager, result1.records());

// 4. compact these two stream objects
CommitStreamObjectRequestData streamObjectRequest = new CommitStreamObjectRequestData()
.setObjectId(4L)
.setStreamId(STREAM1)
.setStartOffset(0L)
.setEndOffset(400L)
.setObjectSize(999)
.setSourceObjectIds(List.of(1L, 3L));
ControllerResult<CommitStreamObjectResponseData> result2 = manager.commitStreamObject(streamObjectRequest);
assertEquals(Errors.NONE.code(), result2.response().errorCode());
replay(manager, result2.records());

// 5. fetch stream offset range
GetStreamsOffsetRequestData request = new GetStreamsOffsetRequestData()
.setStreamIds(List.of(STREAM0, STREAM1));
GetStreamsOffsetResponseData streamsOffset = manager.getStreamsOffset(request);
assertEquals(2, streamsOffset.streamsOffset().size());
assertEquals(STREAM0, streamsOffset.streamsOffset().get(0).streamId());
assertEquals(0L, streamsOffset.streamsOffset().get(0).startOffset());
assertEquals(200L, streamsOffset.streamsOffset().get(0).endOffset());
assertEquals(STREAM1, streamsOffset.streamsOffset().get(1).streamId());
assertEquals(0L, streamsOffset.streamsOffset().get(1).startOffset());
assertEquals(400L, streamsOffset.streamsOffset().get(1).endOffset());

// 6. verify stream objects
assertEquals(1, manager.streamsMetadata().get(STREAM1).streamObjects().size());
assertEquals(4L, manager.streamsMetadata().get(STREAM1).streamObjects().get(4L).objectId());
assertEquals(0L, manager.streamsMetadata().get(STREAM1).streamObjects().get(4L).streamIndex().getStartOffset());
assertEquals(400L, manager.streamsMetadata().get(STREAM1).streamObjects().get(4L).streamIndex().getEndOffset());
}

private void commitFirstLevelWalObject(long objectId, long orderId, long streamId, long startOffset, long endOffset, long epoch, int brokerId) {
List<ObjectStreamRange> streamRanges0 = List.of(new ObjectStreamRange()
.setStreamId(streamId)
Expand Down Expand Up @@ -586,7 +678,7 @@ private void replay(StreamControlManager manager, List<ApiMessageAndVersion> rec
case S3_STREAM_OBJECT_RECORD:
manager.replay((S3StreamObjectRecord) message);
break;
case REMOVE_S3_OBJECT_RECORD:
case REMOVE_S3_STREAM_OBJECT_RECORD:
manager.replay((RemoveS3StreamObjectRecord) message);
break;
default:
Expand Down