diff --git a/clients/src/main/java/org/apache/kafka/common/errors/s3/RedundantOperationException.java b/clients/src/main/java/org/apache/kafka/common/errors/s3/RedundantOperationException.java new file mode 100644 index 0000000000..5f7eb4c3ec --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/errors/s3/RedundantOperationException.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.errors.s3; + +import org.apache.kafka.common.errors.ApiException; + +public class RedundantOperationException extends ApiException { + + public RedundantOperationException(String message) { + super(message); + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java index 6d33e7632e..ee8c9978fe 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java @@ -128,6 +128,7 @@ import org.apache.kafka.common.errors.UnsupportedSaslMechanismException; import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.errors.s3.ObjectNotExistException; +import org.apache.kafka.common.errors.s3.RedundantOperationException; import org.apache.kafka.common.errors.s3.StreamExistException; import org.apache.kafka.common.errors.s3.StreamFencedException; import org.apache.kafka.common.errors.s3.StreamInnerErrorException; @@ -387,7 +388,7 @@ public enum Errors { OBJECT_NOT_EXIST(504, "The object does not exist.", ObjectNotExistException::new), STREAM_NOT_OPENED(505, "The stream is not opened.", StreamNotOpenedException::new), STREAM_NOT_CLOSED(506, "The stream is not closed.", StreamNotClosedException::new), - + REDUNDANT_OPERATION(507, "The operation is redundant.", RedundantOperationException::new), diff --git a/metadata/src/main/java/org/apache/kafka/controller/stream/S3ObjectControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/stream/S3ObjectControlManager.java index 0ba64bb1cf..290898a9ad 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/stream/S3ObjectControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/stream/S3ObjectControlManager.java @@ -35,6 +35,7 @@ import org.apache.kafka.common.metadata.AssignedS3ObjectIdRecord; import org.apache.kafka.common.metadata.RemoveS3ObjectRecord; import org.apache.kafka.common.metadata.S3ObjectRecord; +import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.controller.ControllerRequestContext; import org.apache.kafka.controller.ControllerResult; @@ -154,11 +155,20 @@ public ControllerResult prepareObject(PrepareS3Obje return ControllerResult.atomicOf(records, response); } - public ControllerResult commitObject(long objectId, long objectSize) { + public ControllerResult commitObject(long objectId, long objectSize) { S3Object object = this.objectsMetadata.get(objectId); if (object == null) { log.error("object {} not exist when commit wal object", objectId); - return ControllerResult.of(Collections.emptyList(), false); + return ControllerResult.of(Collections.emptyList(), Errors.OBJECT_NOT_EXIST); + } + // verify the state + if (object.getS3ObjectState() == S3ObjectState.COMMITTED) { + log.warn("object {} already committed", objectId); + return ControllerResult.of(Collections.emptyList(), Errors.REDUNDANT_OPERATION); + } + if (object.getS3ObjectState() != S3ObjectState.PREPARED) { + log.error("object {} is not prepared but try to commit", objectId); + return ControllerResult.of(Collections.emptyList(), Errors.OBJECT_NOT_EXIST); } S3ObjectRecord record = new S3ObjectRecord() .setObjectId(objectId) @@ -168,14 +178,14 @@ public ControllerResult commitObject(long objectId, long objectSize) { .setExpiredTimeInMs(object.getExpiredTimeInMs()) .setCommittedTimeInMs(System.currentTimeMillis()); return ControllerResult.of(List.of( - new ApiMessageAndVersion(record, (short) 0)), true); + new ApiMessageAndVersion(record, (short) 0)), Errors.NONE); } public ControllerResult markDestroyObjects(List objects) { List records = new ArrayList<>(); for (Long objectId : objects) { S3Object object = this.objectsMetadata.get(objectId); - if (object == null) { + if (object == null || object.getS3ObjectState() == S3ObjectState.MARK_DESTROYED) { log.error("object {} not exist when mark destroy object", objectId); // TODO: Maybe we can ignore this situation, because this object is already destroyed ? return ControllerResult.of(Collections.emptyList(), false); diff --git a/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java index 73b3636864..a7f44ef9f8 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java @@ -362,19 +362,25 @@ public ControllerResult commitWALObject(CommitWALOb long objectSize = data.objectSize(); long orderId = data.orderId(); List streamRanges = data.objectStreamRanges(); + List compactedObjectIds = data.compactedObjectIds(); // commit object - ControllerResult commitResult = this.s3ObjectControlManager.commitObject(objectId, objectSize); - if (!commitResult.response()) { + ControllerResult commitResult = this.s3ObjectControlManager.commitObject(objectId, objectSize); + if (commitResult.response() == Errors.OBJECT_NOT_EXIST) { log.error("[CommitWALObject]: object {} not exist when commit wal object", objectId); resp.setErrorCode(Errors.OBJECT_NOT_EXIST.code()); return ControllerResult.of(Collections.emptyList(), resp); } + if (commitResult.response() == Errors.REDUNDANT_OPERATION) { + // regard it as redundant commit operation, just return success + log.warn("[CommitWALObject]: object {} already committed", objectId); + return ControllerResult.of(Collections.emptyList(), resp); + } records.addAll(commitResult.records()); // mark destroy compacted object - if (data.compactedObjectIds() != null && !data.compactedObjectIds().isEmpty()) { - ControllerResult destroyResult = this.s3ObjectControlManager.markDestroyObjects(data.compactedObjectIds()); + if (compactedObjectIds != null && !compactedObjectIds.isEmpty()) { + ControllerResult destroyResult = this.s3ObjectControlManager.markDestroyObjects(compactedObjectIds); if (!destroyResult.response()) { - log.error("[CommitWALObject]: Mark destroy compacted objects: {} failed", data.compactedObjectIds()); + log.error("[CommitWALObject]: Mark destroy compacted objects: {} failed", compactedObjectIds); resp.setErrorCode(Errors.STREAM_INNER_ERROR.code()); return ControllerResult.of(Collections.emptyList(), resp); } @@ -408,10 +414,12 @@ public ControllerResult commitWALObject(CommitWALOb long endOffset = obj.endOffset(); records.add(new S3StreamObject(obj.objectId(), obj.objectSize(), streamId, startOffset, endOffset).toRecord()); }); - // generate compacted objects' remove record - data.compactedObjectIds().forEach(id -> records.add(new ApiMessageAndVersion(new RemoveWALObjectRecord() - .setObjectId(id), (short) 0))); - log.info("[CommitWALObject]: broker: {} commit wal object: {} success, compacted objects: {}", brokerId, objectId, data.compactedObjectIds()); + if (compactedObjectIds != null && !compactedObjectIds.isEmpty()) { + // generate compacted objects' remove record + compactedObjectIds.forEach(id -> records.add(new ApiMessageAndVersion(new RemoveWALObjectRecord() + .setObjectId(id), (short) 0))); + } + log.info("[CommitWALObject]: broker: {} commit wal object: {} success, compacted objects: {}", brokerId, objectId, compactedObjectIds); return ControllerResult.atomicOf(records, resp); } @@ -426,12 +434,17 @@ public ControllerResult commitStreamObject(Commi CommitStreamObjectResponseData resp = new CommitStreamObjectResponseData(); // commit object - ControllerResult commitResult = this.s3ObjectControlManager.commitObject(streamObjectId, objectSize); - if (!commitResult.response()) { + ControllerResult commitResult = this.s3ObjectControlManager.commitObject(streamObjectId, objectSize); + if (commitResult.response() == Errors.OBJECT_NOT_EXIST) { log.error("[CommitStreamObject]: object {} not exist when commit stream object", streamObjectId); resp.setErrorCode(Errors.OBJECT_NOT_EXIST.code()); return ControllerResult.of(Collections.emptyList(), resp); } + if (commitResult.response() == Errors.REDUNDANT_OPERATION) { + // regard it as redundant commit operation, just return success + log.warn("[CommitStreamObject]: object {} already committed", streamObjectId); + return ControllerResult.of(Collections.emptyList(), resp); + } // mark destroy compacted object if (sourceObjectIds != null && !sourceObjectIds.isEmpty()) { diff --git a/metadata/src/test/java/org/apache/kafka/controller/S3ObjectControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/S3ObjectControlManagerTest.java index 062153e0a8..b191ae3e61 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/S3ObjectControlManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/S3ObjectControlManagerTest.java @@ -18,7 +18,6 @@ package org.apache.kafka.controller; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; @@ -158,18 +157,23 @@ public void testCommitObject() { prepareOneObject(60 * 1000); // 2. commit an object which not exist in controller - ControllerResult result1 = manager.commitObject(1, 1024); - assertFalse(result1.response()); + ControllerResult result1 = manager.commitObject(1, 1024); + assertEquals(Errors.OBJECT_NOT_EXIST, result1.response()); assertEquals(0, result1.records().size()); // 3. commit an valid object - ControllerResult result2 = manager.commitObject(0, 1024); - assertTrue(result2.response()); + ControllerResult result2 = manager.commitObject(0, 1024); + assertEquals(Errors.NONE, result2.response()); assertEquals(1, result2.records().size()); S3ObjectRecord record = (S3ObjectRecord) result2.records().get(0).message(); manager.replay(record); - // 4. verify the object is committed + // 4. commit again + ControllerResult result3 = manager.commitObject(0, 1024); + assertEquals(Errors.REDUNDANT_OPERATION, result3.response()); + assertEquals(0, result3.records().size()); + + // 5. verify the object is committed assertEquals(1, manager.objectsMetadata().size()); S3Object object = manager.objectsMetadata().get(0L); assertEquals(S3ObjectState.COMMITTED, object.getS3ObjectState()); diff --git a/metadata/src/test/java/org/apache/kafka/controller/StreamControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/StreamControlManagerTest.java index 0e66c0f306..1a35012642 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/StreamControlManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/StreamControlManagerTest.java @@ -273,9 +273,9 @@ public void testCommitWalBasic() { Mockito.when(objectControlManager.commitObject(anyLong(), anyLong())).then(ink -> { long objectId = ink.getArgument(0); if (objectId == 1) { - return ControllerResult.of(Collections.emptyList(), false); + return ControllerResult.of(Collections.emptyList(), Errors.OBJECT_NOT_EXIST); } - return ControllerResult.of(Collections.emptyList(), true); + return ControllerResult.of(Collections.emptyList(), Errors.NONE); }); // 1. create and open stream_0 CreateStreamRequestData request0 = new CreateStreamRequestData(); @@ -376,7 +376,7 @@ private void createAndOpenStream(int brokerId, long epoch) { @Test public void testCommitWalCompacted() { - Mockito.when(objectControlManager.commitObject(anyLong(), anyLong())).thenReturn(ControllerResult.of(Collections.emptyList(), true)); + Mockito.when(objectControlManager.commitObject(anyLong(), anyLong())).thenReturn(ControllerResult.of(Collections.emptyList(), Errors.NONE)); Mockito.when(objectControlManager.markDestroyObjects(anyList())).thenReturn(ControllerResult.of(Collections.emptyList(), true)); // 1. create and open stream_0 and stream_1 @@ -449,7 +449,8 @@ public void testCommitWalCompacted() { assertEquals(0L, streamsOffset.streamsOffset().get(1).startOffset()); assertEquals(300L, streamsOffset.streamsOffset().get(1).endOffset()); - // 6. commit a second level wal object which compact wal_0 and wal_1 + // 6. commit an invalid wal object which contains the destroyed or not exist wal object + Mockito.when(objectControlManager.markDestroyObjects(anyList())).thenReturn(ControllerResult.of(Collections.emptyList(), false)); List streamRanges2 = List.of( new ObjectStreamRange() .setStreamId(STREAM0) @@ -467,12 +468,25 @@ public void testCommitWalCompacted() { .setBrokerId(BROKER0) .setObjectSize(999) .setObjectStreamRanges(streamRanges2) - .setCompactedObjectIds(List.of(0L, 1L)); + .setCompactedObjectIds(List.of(0L, 1L, 10L)); ControllerResult result6 = manager.commitWALObject(commitRequest2); + assertEquals(Errors.STREAM_INNER_ERROR.code(), result6.response().errorCode()); + assertEquals(0, result6.records().size()); + Mockito.when(objectControlManager.markDestroyObjects(anyList())).thenReturn(ControllerResult.of(Collections.emptyList(), true)); + + // 7. commit a second level wal object which compact wal_0 and wal_1 + commitRequest2 = new CommitWALObjectRequestData() + .setObjectId(2L) + .setOrderId(0L) + .setBrokerId(BROKER0) + .setObjectSize(999) + .setObjectStreamRanges(streamRanges2) + .setCompactedObjectIds(List.of(0L, 1L)); + result6 = manager.commitWALObject(commitRequest2); assertEquals(Errors.NONE.code(), result6.response().errorCode()); replay(manager, result6.records()); - // 7. fetch range end offset + // 8. fetch range end offset streamsOffset = manager.getStreamsOffset(request); assertEquals(2, streamsOffset.streamsOffset().size()); assertEquals(STREAM0, streamsOffset.streamsOffset().get(0).streamId()); @@ -482,15 +496,16 @@ public void testCommitWalCompacted() { assertEquals(0L, streamsOffset.streamsOffset().get(1).startOffset()); assertEquals(300L, streamsOffset.streamsOffset().get(1).endOffset()); - // 8. verify compacted wal objects is removed + // 9. verify compacted wal objects is removed assertEquals(1, manager.brokersMetadata().get(BROKER0).walObjects().size()); assertEquals(2, manager.brokersMetadata().get(BROKER0).walObjects().get(2L).objectId()); assertEquals(0, manager.brokersMetadata().get(BROKER0).walObjects().get(2L).orderId()); + } @Test public void testCommitWalWithStreamObject() { - Mockito.when(objectControlManager.commitObject(anyLong(), anyLong())).thenReturn(ControllerResult.of(Collections.emptyList(), true)); + Mockito.when(objectControlManager.commitObject(anyLong(), anyLong())).thenReturn(ControllerResult.of(Collections.emptyList(), Errors.NONE)); Mockito.when(objectControlManager.markDestroyObjects(anyList())).thenReturn(ControllerResult.of(Collections.emptyList(), true)); // 1. create and open stream_0 and stream_1 @@ -540,7 +555,7 @@ public void testCommitWalWithStreamObject() { @Test public void testCommitStreamObject() { - Mockito.when(objectControlManager.commitObject(anyLong(), anyLong())).thenReturn(ControllerResult.of(Collections.emptyList(), true)); + Mockito.when(objectControlManager.commitObject(anyLong(), anyLong())).thenReturn(ControllerResult.of(Collections.emptyList(), Errors.NONE)); Mockito.when(objectControlManager.markDestroyObjects(anyList())).thenReturn(ControllerResult.of(Collections.emptyList(), true)); // 1. create and open stream_0 and stream_1 @@ -621,6 +636,19 @@ public void testCommitStreamObject() { assertEquals(0L, streamsOffset.streamsOffset().get(1).startOffset()); assertEquals(400L, streamsOffset.streamsOffset().get(1).endOffset()); + // 6. compact a stream object from invalid source object + Mockito.when(objectControlManager.markDestroyObjects(anyList())).thenReturn(ControllerResult.of(Collections.emptyList(), false)); + streamObjectRequest = new CommitStreamObjectRequestData() + .setObjectId(5L) + .setStreamId(STREAM1) + .setStartOffset(400L) + .setEndOffset(1000L) + .setObjectSize(999) + .setSourceObjectIds(List.of(10L)); + result2 = manager.commitStreamObject(streamObjectRequest); + assertEquals(Errors.STREAM_INNER_ERROR.code(), result2.response().errorCode()); + replay(manager, result2.records()); + // 6. verify stream objects assertEquals(1, manager.streamsMetadata().get(STREAM1).streamObjects().size()); assertEquals(4L, manager.streamsMetadata().get(STREAM1).streamObjects().get(4L).objectId());