Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.common.errors.s3;

import org.apache.kafka.common.errors.ApiException;

public class RedundantOperationException extends ApiException {

public RedundantOperationException(String message) {
super(message);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@
import org.apache.kafka.common.errors.UnsupportedSaslMechanismException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.errors.s3.ObjectNotExistException;
import org.apache.kafka.common.errors.s3.RedundantOperationException;
import org.apache.kafka.common.errors.s3.StreamExistException;
import org.apache.kafka.common.errors.s3.StreamFencedException;
import org.apache.kafka.common.errors.s3.StreamInnerErrorException;
Expand Down Expand Up @@ -387,7 +388,7 @@ public enum Errors {
OBJECT_NOT_EXIST(504, "The object does not exist.", ObjectNotExistException::new),
STREAM_NOT_OPENED(505, "The stream is not opened.", StreamNotOpenedException::new),
STREAM_NOT_CLOSED(506, "The stream is not closed.", StreamNotClosedException::new),

REDUNDANT_OPERATION(507, "The operation is redundant.", RedundantOperationException::new),



Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.kafka.common.metadata.AssignedS3ObjectIdRecord;
import org.apache.kafka.common.metadata.RemoveS3ObjectRecord;
import org.apache.kafka.common.metadata.S3ObjectRecord;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.controller.ControllerRequestContext;
import org.apache.kafka.controller.ControllerResult;
Expand Down Expand Up @@ -154,11 +155,20 @@ public ControllerResult<PrepareS3ObjectResponseData> prepareObject(PrepareS3Obje
return ControllerResult.atomicOf(records, response);
}

public ControllerResult<Boolean> commitObject(long objectId, long objectSize) {
public ControllerResult<Errors> commitObject(long objectId, long objectSize) {
S3Object object = this.objectsMetadata.get(objectId);
if (object == null) {
log.error("object {} not exist when commit wal object", objectId);
return ControllerResult.of(Collections.emptyList(), false);
return ControllerResult.of(Collections.emptyList(), Errors.OBJECT_NOT_EXIST);
}
// verify the state
if (object.getS3ObjectState() == S3ObjectState.COMMITTED) {
log.warn("object {} already committed", objectId);
return ControllerResult.of(Collections.emptyList(), Errors.REDUNDANT_OPERATION);
}
if (object.getS3ObjectState() != S3ObjectState.PREPARED) {
log.error("object {} is not prepared but try to commit", objectId);
return ControllerResult.of(Collections.emptyList(), Errors.OBJECT_NOT_EXIST);
}
S3ObjectRecord record = new S3ObjectRecord()
.setObjectId(objectId)
Expand All @@ -168,14 +178,14 @@ public ControllerResult<Boolean> commitObject(long objectId, long objectSize) {
.setExpiredTimeInMs(object.getExpiredTimeInMs())
.setCommittedTimeInMs(System.currentTimeMillis());
return ControllerResult.of(List.of(
new ApiMessageAndVersion(record, (short) 0)), true);
new ApiMessageAndVersion(record, (short) 0)), Errors.NONE);
}

public ControllerResult<Boolean> markDestroyObjects(List<Long> objects) {
List<ApiMessageAndVersion> records = new ArrayList<>();
for (Long objectId : objects) {
S3Object object = this.objectsMetadata.get(objectId);
if (object == null) {
if (object == null || object.getS3ObjectState() == S3ObjectState.MARK_DESTROYED) {
log.error("object {} not exist when mark destroy object", objectId);
// TODO: Maybe we can ignore this situation, because this object is already destroyed ?
return ControllerResult.of(Collections.emptyList(), false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -362,19 +362,25 @@ public ControllerResult<CommitWALObjectResponseData> commitWALObject(CommitWALOb
long objectSize = data.objectSize();
long orderId = data.orderId();
List<ObjectStreamRange> streamRanges = data.objectStreamRanges();
List<Long> compactedObjectIds = data.compactedObjectIds();
// commit object
ControllerResult<Boolean> commitResult = this.s3ObjectControlManager.commitObject(objectId, objectSize);
if (!commitResult.response()) {
ControllerResult<Errors> commitResult = this.s3ObjectControlManager.commitObject(objectId, objectSize);
if (commitResult.response() == Errors.OBJECT_NOT_EXIST) {
log.error("[CommitWALObject]: object {} not exist when commit wal object", objectId);
resp.setErrorCode(Errors.OBJECT_NOT_EXIST.code());
return ControllerResult.of(Collections.emptyList(), resp);
}
if (commitResult.response() == Errors.REDUNDANT_OPERATION) {
// regard it as redundant commit operation, just return success
log.warn("[CommitWALObject]: object {} already committed", objectId);
return ControllerResult.of(Collections.emptyList(), resp);
}
records.addAll(commitResult.records());
// mark destroy compacted object
if (data.compactedObjectIds() != null && !data.compactedObjectIds().isEmpty()) {
ControllerResult<Boolean> destroyResult = this.s3ObjectControlManager.markDestroyObjects(data.compactedObjectIds());
if (compactedObjectIds != null && !compactedObjectIds.isEmpty()) {
ControllerResult<Boolean> destroyResult = this.s3ObjectControlManager.markDestroyObjects(compactedObjectIds);
if (!destroyResult.response()) {
log.error("[CommitWALObject]: Mark destroy compacted objects: {} failed", data.compactedObjectIds());
log.error("[CommitWALObject]: Mark destroy compacted objects: {} failed", compactedObjectIds);
resp.setErrorCode(Errors.STREAM_INNER_ERROR.code());
return ControllerResult.of(Collections.emptyList(), resp);
}
Expand Down Expand Up @@ -408,10 +414,12 @@ public ControllerResult<CommitWALObjectResponseData> commitWALObject(CommitWALOb
long endOffset = obj.endOffset();
records.add(new S3StreamObject(obj.objectId(), obj.objectSize(), streamId, startOffset, endOffset).toRecord());
});
// generate compacted objects' remove record
data.compactedObjectIds().forEach(id -> records.add(new ApiMessageAndVersion(new RemoveWALObjectRecord()
.setObjectId(id), (short) 0)));
log.info("[CommitWALObject]: broker: {} commit wal object: {} success, compacted objects: {}", brokerId, objectId, data.compactedObjectIds());
if (compactedObjectIds != null && !compactedObjectIds.isEmpty()) {
// generate compacted objects' remove record
compactedObjectIds.forEach(id -> records.add(new ApiMessageAndVersion(new RemoveWALObjectRecord()
.setObjectId(id), (short) 0)));
}
log.info("[CommitWALObject]: broker: {} commit wal object: {} success, compacted objects: {}", brokerId, objectId, compactedObjectIds);
return ControllerResult.atomicOf(records, resp);
}

Expand All @@ -426,12 +434,17 @@ public ControllerResult<CommitStreamObjectResponseData> commitStreamObject(Commi
CommitStreamObjectResponseData resp = new CommitStreamObjectResponseData();

// commit object
ControllerResult<Boolean> commitResult = this.s3ObjectControlManager.commitObject(streamObjectId, objectSize);
if (!commitResult.response()) {
ControllerResult<Errors> commitResult = this.s3ObjectControlManager.commitObject(streamObjectId, objectSize);
if (commitResult.response() == Errors.OBJECT_NOT_EXIST) {
log.error("[CommitStreamObject]: object {} not exist when commit stream object", streamObjectId);
resp.setErrorCode(Errors.OBJECT_NOT_EXIST.code());
return ControllerResult.of(Collections.emptyList(), resp);
}
if (commitResult.response() == Errors.REDUNDANT_OPERATION) {
// regard it as redundant commit operation, just return success
log.warn("[CommitStreamObject]: object {} already committed", streamObjectId);
return ControllerResult.of(Collections.emptyList(), resp);
}

// mark destroy compacted object
if (sourceObjectIds != null && !sourceObjectIds.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.kafka.controller;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
Expand Down Expand Up @@ -158,18 +157,23 @@ public void testCommitObject() {
prepareOneObject(60 * 1000);

// 2. commit an object which not exist in controller
ControllerResult<Boolean> result1 = manager.commitObject(1, 1024);
assertFalse(result1.response());
ControllerResult<Errors> result1 = manager.commitObject(1, 1024);
assertEquals(Errors.OBJECT_NOT_EXIST, result1.response());
assertEquals(0, result1.records().size());

// 3. commit an valid object
ControllerResult<Boolean> result2 = manager.commitObject(0, 1024);
assertTrue(result2.response());
ControllerResult<Errors> result2 = manager.commitObject(0, 1024);
assertEquals(Errors.NONE, result2.response());
assertEquals(1, result2.records().size());
S3ObjectRecord record = (S3ObjectRecord) result2.records().get(0).message();
manager.replay(record);

// 4. verify the object is committed
// 4. commit again
ControllerResult<Errors> result3 = manager.commitObject(0, 1024);
assertEquals(Errors.REDUNDANT_OPERATION, result3.response());
assertEquals(0, result3.records().size());

// 5. verify the object is committed
assertEquals(1, manager.objectsMetadata().size());
S3Object object = manager.objectsMetadata().get(0L);
assertEquals(S3ObjectState.COMMITTED, object.getS3ObjectState());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,9 +273,9 @@ public void testCommitWalBasic() {
Mockito.when(objectControlManager.commitObject(anyLong(), anyLong())).then(ink -> {
long objectId = ink.getArgument(0);
if (objectId == 1) {
return ControllerResult.of(Collections.emptyList(), false);
return ControllerResult.of(Collections.emptyList(), Errors.OBJECT_NOT_EXIST);
}
return ControllerResult.of(Collections.emptyList(), true);
return ControllerResult.of(Collections.emptyList(), Errors.NONE);
});
// 1. create and open stream_0
CreateStreamRequestData request0 = new CreateStreamRequestData();
Expand Down Expand Up @@ -376,7 +376,7 @@ private void createAndOpenStream(int brokerId, long epoch) {

@Test
public void testCommitWalCompacted() {
Mockito.when(objectControlManager.commitObject(anyLong(), anyLong())).thenReturn(ControllerResult.of(Collections.emptyList(), true));
Mockito.when(objectControlManager.commitObject(anyLong(), anyLong())).thenReturn(ControllerResult.of(Collections.emptyList(), Errors.NONE));
Mockito.when(objectControlManager.markDestroyObjects(anyList())).thenReturn(ControllerResult.of(Collections.emptyList(), true));

// 1. create and open stream_0 and stream_1
Expand Down Expand Up @@ -449,7 +449,8 @@ public void testCommitWalCompacted() {
assertEquals(0L, streamsOffset.streamsOffset().get(1).startOffset());
assertEquals(300L, streamsOffset.streamsOffset().get(1).endOffset());

// 6. commit a second level wal object which compact wal_0 and wal_1
// 6. commit an invalid wal object which contains the destroyed or not exist wal object
Mockito.when(objectControlManager.markDestroyObjects(anyList())).thenReturn(ControllerResult.of(Collections.emptyList(), false));
List<ObjectStreamRange> streamRanges2 = List.of(
new ObjectStreamRange()
.setStreamId(STREAM0)
Expand All @@ -467,12 +468,25 @@ public void testCommitWalCompacted() {
.setBrokerId(BROKER0)
.setObjectSize(999)
.setObjectStreamRanges(streamRanges2)
.setCompactedObjectIds(List.of(0L, 1L));
.setCompactedObjectIds(List.of(0L, 1L, 10L));
ControllerResult<CommitWALObjectResponseData> result6 = manager.commitWALObject(commitRequest2);
assertEquals(Errors.STREAM_INNER_ERROR.code(), result6.response().errorCode());
assertEquals(0, result6.records().size());
Mockito.when(objectControlManager.markDestroyObjects(anyList())).thenReturn(ControllerResult.of(Collections.emptyList(), true));

// 7. commit a second level wal object which compact wal_0 and wal_1
commitRequest2 = new CommitWALObjectRequestData()
.setObjectId(2L)
.setOrderId(0L)
.setBrokerId(BROKER0)
.setObjectSize(999)
.setObjectStreamRanges(streamRanges2)
.setCompactedObjectIds(List.of(0L, 1L));
result6 = manager.commitWALObject(commitRequest2);
assertEquals(Errors.NONE.code(), result6.response().errorCode());
replay(manager, result6.records());

// 7. fetch range end offset
// 8. fetch range end offset
streamsOffset = manager.getStreamsOffset(request);
assertEquals(2, streamsOffset.streamsOffset().size());
assertEquals(STREAM0, streamsOffset.streamsOffset().get(0).streamId());
Expand All @@ -482,15 +496,16 @@ public void testCommitWalCompacted() {
assertEquals(0L, streamsOffset.streamsOffset().get(1).startOffset());
assertEquals(300L, streamsOffset.streamsOffset().get(1).endOffset());

// 8. verify compacted wal objects is removed
// 9. verify compacted wal objects is removed
assertEquals(1, manager.brokersMetadata().get(BROKER0).walObjects().size());
assertEquals(2, manager.brokersMetadata().get(BROKER0).walObjects().get(2L).objectId());
assertEquals(0, manager.brokersMetadata().get(BROKER0).walObjects().get(2L).orderId());

}

@Test
public void testCommitWalWithStreamObject() {
Mockito.when(objectControlManager.commitObject(anyLong(), anyLong())).thenReturn(ControllerResult.of(Collections.emptyList(), true));
Mockito.when(objectControlManager.commitObject(anyLong(), anyLong())).thenReturn(ControllerResult.of(Collections.emptyList(), Errors.NONE));
Mockito.when(objectControlManager.markDestroyObjects(anyList())).thenReturn(ControllerResult.of(Collections.emptyList(), true));

// 1. create and open stream_0 and stream_1
Expand Down Expand Up @@ -540,7 +555,7 @@ public void testCommitWalWithStreamObject() {

@Test
public void testCommitStreamObject() {
Mockito.when(objectControlManager.commitObject(anyLong(), anyLong())).thenReturn(ControllerResult.of(Collections.emptyList(), true));
Mockito.when(objectControlManager.commitObject(anyLong(), anyLong())).thenReturn(ControllerResult.of(Collections.emptyList(), Errors.NONE));
Mockito.when(objectControlManager.markDestroyObjects(anyList())).thenReturn(ControllerResult.of(Collections.emptyList(), true));

// 1. create and open stream_0 and stream_1
Expand Down Expand Up @@ -621,6 +636,19 @@ public void testCommitStreamObject() {
assertEquals(0L, streamsOffset.streamsOffset().get(1).startOffset());
assertEquals(400L, streamsOffset.streamsOffset().get(1).endOffset());

// 6. compact a stream object from invalid source object
Mockito.when(objectControlManager.markDestroyObjects(anyList())).thenReturn(ControllerResult.of(Collections.emptyList(), false));
streamObjectRequest = new CommitStreamObjectRequestData()
.setObjectId(5L)
.setStreamId(STREAM1)
.setStartOffset(400L)
.setEndOffset(1000L)
.setObjectSize(999)
.setSourceObjectIds(List.of(10L));
result2 = manager.commitStreamObject(streamObjectRequest);
assertEquals(Errors.STREAM_INNER_ERROR.code(), result2.response().errorCode());
replay(manager, result2.records());

// 6. verify stream objects
assertEquals(1, manager.streamsMetadata().get(STREAM1).streamObjects().size());
assertEquals(4L, manager.streamsMetadata().get(STREAM1).streamObjects().get(4L).objectId());
Expand Down