diff --git a/clients/src/main/resources/common/message/CommitWALObjectRequest.json b/clients/src/main/resources/common/message/CommitWALObjectRequest.json index 0a53e0586e..48db9f3efc 100644 --- a/clients/src/main/resources/common/message/CommitWALObjectRequest.json +++ b/clients/src/main/resources/common/message/CommitWALObjectRequest.json @@ -37,6 +37,12 @@ "versions": "0+", "about": "The ID of the WAL S3 object to commit" }, + { + "name": "OrderId", + "type": "int64", + "versions": "0+", + "about": "The order ID of the WAL S3 object" + }, { "name": "ObjectSize", "type": "int64", @@ -86,13 +92,13 @@ "name": "ObjectId", "type": "int64", "versions": "0+", - "about": "The ID of the WAL S3 object to commit" + "about": "The ID of the Stream object to commit" }, { "name": "ObjectSize", "type": "int64", "versions": "0+", - "about": "The size of the WAL S3 object to commit" + "about": "The size of the Stream object to commit" }, { "name": "StreamId", diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java index 85c1e5a26a..bf353f5ce6 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java +++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java @@ -65,7 +65,10 @@ import org.apache.kafka.common.message.UpdateFeaturesRequestData; import org.apache.kafka.common.message.UpdateFeaturesResponseData; import org.apache.kafka.common.metadata.AccessControlEntryRecord; +import org.apache.kafka.common.metadata.AssignedS3ObjectIdRecord; +import org.apache.kafka.common.metadata.AssignedStreamIdRecord; import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord; +import org.apache.kafka.common.metadata.BrokerWALMetadataRecord; import org.apache.kafka.common.metadata.ClientQuotaRecord; import org.apache.kafka.common.metadata.ConfigRecord; import org.apache.kafka.common.metadata.FeatureLevelRecord; @@ -75,12 +78,23 @@ import org.apache.kafka.common.metadata.PartitionChangeRecord; import org.apache.kafka.common.metadata.PartitionRecord; import org.apache.kafka.common.metadata.ProducerIdsRecord; +import org.apache.kafka.common.metadata.RangeRecord; import org.apache.kafka.common.metadata.RegisterBrokerRecord; import org.apache.kafka.common.metadata.RemoveAccessControlEntryRecord; +import org.apache.kafka.common.metadata.RemoveBrokerWALMetadataRecord; +import org.apache.kafka.common.metadata.RemoveRangeRecord; +import org.apache.kafka.common.metadata.RemoveS3ObjectRecord; +import org.apache.kafka.common.metadata.RemoveS3StreamObjectRecord; +import org.apache.kafka.common.metadata.RemoveS3StreamRecord; import org.apache.kafka.common.metadata.RemoveTopicRecord; +import org.apache.kafka.common.metadata.RemoveWALObjectRecord; +import org.apache.kafka.common.metadata.S3ObjectRecord; +import org.apache.kafka.common.metadata.S3StreamObjectRecord; +import org.apache.kafka.common.metadata.S3StreamRecord; import org.apache.kafka.common.metadata.TopicRecord; import org.apache.kafka.common.metadata.UnfenceBrokerRecord; import org.apache.kafka.common.metadata.UnregisterBrokerRecord; +import org.apache.kafka.common.metadata.WALObjectRecord; import org.apache.kafka.common.metadata.ZkMigrationStateRecord; import org.apache.kafka.common.protocol.ApiMessage; import org.apache.kafka.common.quota.ClientQuotaAlteration; @@ -1451,6 +1465,53 @@ private void replay(ApiMessage message, Optional snapshotId, lon case ZK_MIGRATION_STATE_RECORD: // TODO handle this break; + + // Kafka on S3 inject start + + case S3_STREAM_RECORD: + streamControlManager.replay((S3StreamRecord) message); + break; + case REMOVE_S3_STREAM_RECORD: + streamControlManager.replay((RemoveS3StreamRecord) message); + break; + case RANGE_RECORD: + streamControlManager.replay((RangeRecord) message); + break; + case REMOVE_RANGE_RECORD: + streamControlManager.replay((RemoveRangeRecord) message); + break; + case S3_STREAM_OBJECT_RECORD: + streamControlManager.replay((S3StreamObjectRecord) message); + break; + case REMOVE_S3_STREAM_OBJECT_RECORD: + streamControlManager.replay((RemoveS3StreamObjectRecord) message); + break; + case WALOBJECT_RECORD: + streamControlManager.replay((WALObjectRecord) message); + break; + case REMOVE_WALOBJECT_RECORD: + streamControlManager.replay((RemoveWALObjectRecord) message); + break; + case S3_OBJECT_RECORD: + s3ObjectControlManager.replay((S3ObjectRecord) message); + break; + case REMOVE_S3_OBJECT_RECORD: + s3ObjectControlManager.replay((RemoveS3ObjectRecord) message); + break; + case ASSIGNED_STREAM_ID_RECORD: + streamControlManager.replay((AssignedStreamIdRecord) message); + break; + case ASSIGNED_S3_OBJECT_ID_RECORD: + s3ObjectControlManager.replay((AssignedS3ObjectIdRecord) message); + break; + case BROKER_WALMETADATA_RECORD: + streamControlManager.replay((BrokerWALMetadataRecord) message); + break; + case REMOVE_BROKER_WALMETADATA_RECORD: + streamControlManager.replay((RemoveBrokerWALMetadataRecord) message); + break; + + // Kafka on S3 inject end default: throw new RuntimeException("Unhandled record type " + type); } diff --git a/metadata/src/main/java/org/apache/kafka/controller/stream/S3ObjectControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/stream/S3ObjectControlManager.java index 3ea1b86e3c..1ce2e6823c 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/stream/S3ObjectControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/stream/S3ObjectControlManager.java @@ -172,6 +172,27 @@ public ControllerResult commitObject(long objectId, long objectSize) { new ApiMessageAndVersion(record, (short) 0)), true); } + public ControllerResult markDestroyObjects(List objects) { + List records = new ArrayList<>(); + for (Long objectId : objects) { + S3Object object = this.objectsMetadata.get(objectId); + if (object == null) { + log.error("object {} not exist when mark destroy object", objectId); + // TODO: Maybe we can ignore this situation, because this object is already destroyed ? + return ControllerResult.of(Collections.emptyList(), false); + } + S3ObjectRecord record = new S3ObjectRecord() + .setObjectId(objectId) + .setObjectState(S3ObjectState.MARK_DESTROYED.toByte()) + .setPreparedTimeInMs(object.getPreparedTimeInMs()) + .setExpiredTimeInMs(object.getExpiredTimeInMs()) + .setCommittedTimeInMs(object.getCommittedTimeInMs()) + .setMarkDestroyedTimeInMs(System.currentTimeMillis()); + records.add(new ApiMessageAndVersion(record, (short) 0)); + } + return ControllerResult.atomicOf(records, true); + } + public void replay(AssignedS3ObjectIdRecord record) { nextAssignedObjectId.set(record.assignedS3ObjectId() + 1); } @@ -180,7 +201,7 @@ public void replay(S3ObjectRecord record) { GenerateContextV0 ctx = new GenerateContextV0(clusterId, record.objectId()); String objectKey = S3ObjectKeyGeneratorManager.getByVersion(0).generate(ctx); S3Object object = new S3Object(record.objectId(), record.objectSize(), objectKey, - record.preparedTimeInMs(), record.expiredTimeInMs(), record.committedTimeInMs(), record.destroyedTimeInMs(), + record.preparedTimeInMs(), record.expiredTimeInMs(), record.committedTimeInMs(), record.markDestroyedTimeInMs(), S3ObjectState.fromByte(record.objectState())); objectsMetadata.put(record.objectId(), object); // TODO: recover the prepared objects and mark destroyed objects when restart the controller @@ -215,7 +236,7 @@ public ControllerResult checkS3ObjectsLifecycle() { .setPreparedTimeInMs(obj.getPreparedTimeInMs()) .setExpiredTimeInMs(obj.getExpiredTimeInMs()) .setCommittedTimeInMs(obj.getCommittedTimeInMs()) - .setDestroyedTimeInMs(obj.getDestroyedTimeInMs()); + .setMarkDestroyedTimeInMs(obj.getMarkDestroyedTimeInMs()); // generate the records which mark the expired objects as destroyed records.add(new ApiMessageAndVersion(record, (short) 0)); // generate the records which listener reply for the object-destroy events diff --git a/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java index 41ce96a463..d1b7096928 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java @@ -22,7 +22,6 @@ import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.stream.Collectors; import org.apache.kafka.common.message.CloseStreamRequestData; import org.apache.kafka.common.message.CloseStreamResponseData; @@ -30,6 +29,7 @@ import org.apache.kafka.common.message.CommitStreamObjectResponseData; import org.apache.kafka.common.message.CommitWALObjectRequestData; import org.apache.kafka.common.message.CommitWALObjectRequestData.ObjectStreamRange; +import org.apache.kafka.common.message.CommitWALObjectRequestData.StreamObject; import org.apache.kafka.common.message.CommitWALObjectResponseData; import org.apache.kafka.common.message.CreateStreamRequestData; import org.apache.kafka.common.message.CreateStreamResponseData; @@ -43,8 +43,12 @@ import org.apache.kafka.common.metadata.AssignedStreamIdRecord; import org.apache.kafka.common.metadata.BrokerWALMetadataRecord; import org.apache.kafka.common.metadata.RangeRecord; +import org.apache.kafka.common.metadata.RemoveBrokerWALMetadataRecord; import org.apache.kafka.common.metadata.RemoveRangeRecord; +import org.apache.kafka.common.metadata.RemoveS3StreamObjectRecord; import org.apache.kafka.common.metadata.RemoveS3StreamRecord; +import org.apache.kafka.common.metadata.RemoveWALObjectRecord; +import org.apache.kafka.common.metadata.S3StreamObjectRecord; import org.apache.kafka.common.metadata.S3StreamRecord; import org.apache.kafka.common.metadata.WALObjectRecord; import org.apache.kafka.common.metadata.WALObjectRecord.StreamIndex; @@ -59,7 +63,6 @@ import org.apache.kafka.server.common.ApiMessageAndVersion; import org.apache.kafka.timeline.SnapshotRegistry; import org.apache.kafka.timeline.TimelineHashMap; -import org.apache.kafka.timeline.TimelineHashSet; import org.apache.kafka.timeline.TimelineInteger; import org.apache.kafka.timeline.TimelineLong; import org.apache.kafka.timeline.TimelineObject; @@ -79,7 +82,7 @@ public static class S3StreamMetadata { private TimelineLong startOffset; private TimelineObject currentState; private TimelineHashMap ranges; - private TimelineHashSet streamObjects; + private TimelineHashMap streamObjects; public S3StreamMetadata(long currentEpoch, int currentRangeIndex, long startOffset, StreamState currentState, SnapshotRegistry registry) { @@ -91,7 +94,7 @@ public S3StreamMetadata(long currentEpoch, int currentRangeIndex, long startOffs this.startOffset.set(startOffset); this.currentState = new TimelineObject(registry, currentState); this.ranges = new TimelineHashMap<>(registry, 0); - this.streamObjects = new TimelineHashSet<>(registry, 0); + this.streamObjects = new TimelineHashMap<>(registry, 0); } public long currentEpoch() { @@ -114,7 +117,11 @@ public Map ranges() { return ranges; } - public Set streamObjects() { + public RangeMetadata currentRangeMetadata() { + return ranges.get(currentRangeIndex.get()); + } + + public Map streamObjects() { return streamObjects; } @@ -134,18 +141,18 @@ public String toString() { public static class BrokerS3WALMetadata { private int brokerId; - private TimelineHashSet walObjects; + private TimelineHashMap walObjects; public BrokerS3WALMetadata(int brokerId, SnapshotRegistry registry) { this.brokerId = brokerId; - this.walObjects = new TimelineHashSet<>(registry, 0); + this.walObjects = new TimelineHashMap<>(registry, 0); } public int getBrokerId() { return brokerId; } - public TimelineHashSet walObjects() { + public TimelineHashMap walObjects() { return walObjects; } @@ -346,12 +353,12 @@ public ControllerResult deleteStream(DeleteStreamReque public ControllerResult commitWALObject(CommitWALObjectRequestData data) { // TODO: deal with compacted objects, mark delete compacted object // TODO: deal with stream objects, replay streamObjectRecord to advance stream's end offset - // TODO: generate order id to ensure the order of all wal object CommitWALObjectResponseData resp = new CommitWALObjectResponseData(); List records = new ArrayList<>(); long objectId = data.objectId(); int brokerId = data.brokerId(); long objectSize = data.objectSize(); + long orderId = data.orderId(); List streamRanges = data.objectStreamRanges(); // commit object ControllerResult commitResult = this.s3ObjectControlManager.commitObject(objectId, objectSize); @@ -361,24 +368,48 @@ public ControllerResult commitWALObject(CommitWALOb return ControllerResult.of(Collections.emptyList(), resp); } records.addAll(commitResult.records()); + // mark destroy compacted object + if (data.compactedObjectIds() != null && !data.compactedObjectIds().isEmpty()) { + ControllerResult destroyResult = this.s3ObjectControlManager.markDestroyObjects(data.compactedObjectIds()); + if (!destroyResult.response()) { + log.error("Mark destroy compacted objects {} failed", String.join(",", data.compactedObjectIds().toArray(new String[0]))); + resp.setErrorCode(Errors.STREAM_INNER_ERROR.code()); + return ControllerResult.of(Collections.emptyList(), resp); + } + records.addAll(destroyResult.records()); + } + List indexes = streamRanges.stream() .map(range -> new S3ObjectStreamIndex(range.streamId(), range.startOffset(), range.endOffset())) .collect(Collectors.toList()); // update broker's wal object BrokerS3WALMetadata brokerMetadata = this.brokersMetadata.get(brokerId); if (brokerMetadata == null) { - // first time commit wal object, create broker's metadata + // first time commit wal object, generate broker's metadata record records.add(new ApiMessageAndVersion(new BrokerWALMetadataRecord() .setBrokerId(brokerId), (short) 0)); } - // create broker's wal object + // generate broker's wal object record records.add(new ApiMessageAndVersion(new WALObjectRecord() .setObjectId(objectId) + .setOrderId(orderId) .setBrokerId(brokerId) .setStreamsIndex( indexes.stream() .map(S3ObjectStreamIndex::toRecordStreamIndex) .collect(Collectors.toList())), (short) 0)); + // generate compacted objects' remove record + data.compactedObjectIds().forEach(id -> records.add(new ApiMessageAndVersion(new RemoveWALObjectRecord() + .setObjectId(id), (short) 0))); + // create stream object records + // TODO: deal with the lifecycle of stream object's source objects, when and how to delete them ? + List streamObjects = data.streamObjects(); + streamObjects.stream().forEach(obj -> { + long streamId = obj.streamId(); + long startOffset = obj.startOffset(); + long endOffset = obj.endOffset(); + records.add(new S3StreamObject(obj.objectId(), obj.objectSize(), streamId, startOffset, endOffset).toRecord()); + }); return ControllerResult.atomicOf(records, resp); } @@ -461,6 +492,7 @@ public void replay(BrokerWALMetadataRecord record) { public void replay(WALObjectRecord record) { long objectId = record.objectId(); int brokerId = record.brokerId(); + long orderId = record.orderId(); List streamIndexes = record.streamsIndex(); BrokerS3WALMetadata brokerMetadata = this.brokersMetadata.get(brokerId); if (brokerMetadata == null) { @@ -474,7 +506,7 @@ public void replay(WALObjectRecord record) { .stream() .map(S3ObjectStreamIndex::of) .collect(Collectors.groupingBy(S3ObjectStreamIndex::getStreamId)); - brokerMetadata.walObjects.add(new S3WALObject(objectId, brokerId, indexMap)); + brokerMetadata.walObjects.put(objectId, new S3WALObject(objectId, brokerId, indexMap, orderId)); // update range record.streamsIndex().forEach(index -> { @@ -484,7 +516,7 @@ public void replay(WALObjectRecord record) { // ignore it return; } - RangeMetadata rangeMetadata = metadata.ranges().get(metadata.currentRangeIndex.get()); + RangeMetadata rangeMetadata = metadata.currentRangeMetadata(); if (rangeMetadata == null) { // ignore it return; @@ -497,6 +529,60 @@ public void replay(WALObjectRecord record) { }); } + public void replay(RemoveWALObjectRecord record) { + long objectId = record.objectId(); + BrokerS3WALMetadata walMetadata = this.brokersMetadata.get(record.brokerId()); + if (walMetadata == null) { + // should not happen + log.error("broker {} not exist when replay remove wal object record {}", record.brokerId(), record); + return; + } + walMetadata.walObjects.remove(objectId); + } + public void replay(S3StreamObjectRecord record) { + long objectId = record.objectId(); + long streamId = record.streamId(); + long startOffset = record.startOffset(); + long endOffset = record.endOffset(); + long objectSize = record.objectSize(); + + S3StreamMetadata streamMetadata = this.streamsMetadata.get(streamId); + if (streamMetadata == null) { + // should not happen + log.error("stream {} not exist when replay stream object record {}", streamId, record); + return; + } + streamMetadata.streamObjects.put(objectId, new S3StreamObject(objectId, objectSize, streamId, startOffset, endOffset)); + // update range + RangeMetadata rangeMetadata = streamMetadata.currentRangeMetadata(); + if (rangeMetadata == null) { + // ignore it + return; + } + if (rangeMetadata.endOffset() != startOffset) { + // ignore it + return; + } + rangeMetadata.setEndOffset(endOffset); + } + + public void replay(RemoveS3StreamObjectRecord record) { + long streamId = record.streamId(); + long objectId = record.objectId(); + S3StreamMetadata streamMetadata = this.streamsMetadata.get(streamId); + if (streamMetadata == null) { + // should not happen + log.error("stream {} not exist when replay remove stream object record {}", streamId, record); + return; + } + streamMetadata.streamObjects.remove(objectId); + } + + public void replay(RemoveBrokerWALMetadataRecord record) { + int brokerId = record.brokerId(); + this.brokersMetadata.remove(brokerId); + } + public Map streamsMetadata() { return streamsMetadata; diff --git a/metadata/src/main/java/org/apache/kafka/metadata/stream/S3Object.java b/metadata/src/main/java/org/apache/kafka/metadata/stream/S3Object.java index 060975121e..e7b841d9e2 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/stream/S3Object.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/stream/S3Object.java @@ -48,9 +48,9 @@ public class S3Object implements Comparable { private long committedTimeInMs; /** - * The time when the object is destroyed + * The time when the object is mark destroyed */ - private long destroyedTimeInMs; + private long markDestroyedTimeInMs; private S3ObjectState s3ObjectState = S3ObjectState.UNINITIALIZED; @@ -65,7 +65,7 @@ public S3Object( final long preparedTimeInMs, final long expiredTimeInMs, final long committedTimeInMs, - final long destroyedTimeInMs, + final long markDestroyedTimeInMs, final S3ObjectState s3ObjectState) { this.objectId = objectId; this.objectSize = objectSize; @@ -73,7 +73,7 @@ public S3Object( this.preparedTimeInMs = preparedTimeInMs; this.expiredTimeInMs = expiredTimeInMs; this.committedTimeInMs = committedTimeInMs; - this.destroyedTimeInMs = destroyedTimeInMs; + this.markDestroyedTimeInMs = markDestroyedTimeInMs; this.s3ObjectState = s3ObjectState; } @@ -85,13 +85,13 @@ public ApiMessageAndVersion toRecord() { .setPreparedTimeInMs(preparedTimeInMs) .setExpiredTimeInMs(expiredTimeInMs) .setCommittedTimeInMs(committedTimeInMs) - .setDestroyedTimeInMs(destroyedTimeInMs), (short) 0); + .setMarkDestroyedTimeInMs(markDestroyedTimeInMs), (short) 0); } public static S3Object of(S3ObjectRecord record) { return new S3Object( record.objectId(), record.objectSize(), null, - record.preparedTimeInMs(), record.expiredTimeInMs(), record.committedTimeInMs(), record.destroyedTimeInMs(), + record.preparedTimeInMs(), record.expiredTimeInMs(), record.committedTimeInMs(), record.markDestroyedTimeInMs(), S3ObjectState.fromByte(record.objectState())); } @@ -137,8 +137,8 @@ public long getCommittedTimeInMs() { return committedTimeInMs; } - public long getDestroyedTimeInMs() { - return destroyedTimeInMs; + public long getMarkDestroyedTimeInMs() { + return markDestroyedTimeInMs; } public long getExpiredTimeInMs() { diff --git a/metadata/src/main/java/org/apache/kafka/metadata/stream/S3StreamObject.java b/metadata/src/main/java/org/apache/kafka/metadata/stream/S3StreamObject.java index 8ede6fd18f..640ca43e24 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/stream/S3StreamObject.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/stream/S3StreamObject.java @@ -25,18 +25,16 @@ public class S3StreamObject { private final long objectId; + private final long objectSize; + private final S3ObjectStreamIndex streamIndex; - public S3StreamObject(long objectId, long streamId, long startOffset, long endOffset) { + public S3StreamObject(long objectId, long objectSize, long streamId, long startOffset, long endOffset) { this.objectId = objectId; + this.objectSize = objectSize; this.streamIndex = new S3ObjectStreamIndex(streamId, startOffset, endOffset); } - public S3StreamObject(long objectId, S3ObjectStreamIndex streamIndex) { - this.objectId = objectId; - this.streamIndex = streamIndex; - } - public S3ObjectStreamIndex streamIndex() { return streamIndex; } @@ -58,8 +56,9 @@ public ApiMessageAndVersion toRecord() { } public static S3StreamObject of(S3StreamObjectRecord record) { - S3ObjectStreamIndex index = new S3ObjectStreamIndex(record.streamId(), record.startOffset(), record.endOffset()); - S3StreamObject s3StreamObject = new S3StreamObject(record.objectId(), index); + S3StreamObject s3StreamObject = new S3StreamObject( + record.objectId(), record.objectSize(), record.streamId(), + record.startOffset(), record.endOffset()); return s3StreamObject; } diff --git a/metadata/src/main/java/org/apache/kafka/metadata/stream/S3WALObject.java b/metadata/src/main/java/org/apache/kafka/metadata/stream/S3WALObject.java index 892f3bdbee..c68a35a69c 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/stream/S3WALObject.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/stream/S3WALObject.java @@ -24,8 +24,16 @@ import org.apache.kafka.common.metadata.WALObjectRecord; import org.apache.kafka.server.common.ApiMessageAndVersion; -public class S3WALObject { - +public class S3WALObject implements Comparable { + + /** + * The order id of the object. + * Sort by this field to get the order of the objects which contains logically increasing streams. + *

+ * When compact a batch of objects to a compacted object, + * this compacted object's order id will be assigned the value first object's order id in this batch + */ + private final long orderId; private final long objectId; private final int brokerId; @@ -34,6 +42,12 @@ public class S3WALObject { private final S3ObjectType objectType = S3ObjectType.UNKNOWN; public S3WALObject(long objectId, int brokerId, final Map> streamsIndex) { + // default orderId is equal to objectId + this(objectId, brokerId, streamsIndex, objectId); + } + + public S3WALObject(long objectId, int brokerId, final Map> streamsIndex, long orderId) { + this.orderId = orderId; this.objectId = objectId; this.brokerId = brokerId; this.streamsIndex = streamsIndex; @@ -84,6 +98,10 @@ public S3ObjectType objectType() { return objectType; } + public long orderId() { + return orderId; + } + @Override public boolean equals(Object o) { if (this == o) { @@ -105,9 +123,15 @@ public int hashCode() { public String toString() { return "S3WALObject{" + "objectId=" + objectId + + ", orderId=" + orderId + ", brokerId=" + brokerId + ", streamsIndex=" + streamsIndex + ", objectType=" + objectType + '}'; } + + @Override + public int compareTo(S3WALObject o) { + return Long.compare(this.orderId, o.orderId); + } } diff --git a/metadata/src/main/resources/common/metadata/S3ObjectRecord.json b/metadata/src/main/resources/common/metadata/S3ObjectRecord.json index 85bb4cd18d..a0d4ddeb87 100644 --- a/metadata/src/main/resources/common/metadata/S3ObjectRecord.json +++ b/metadata/src/main/resources/common/metadata/S3ObjectRecord.json @@ -51,10 +51,10 @@ "about": "The object be committed timestamp" }, { - "name": "DestroyedTimeInMs", + "name": "MarkDestroyedTimeInMs", "type": "int64", "versions": "0+", - "about": "The object be destroyed timestamp" + "about": "The object be mark destroyed timestamp" }, { "name": "ObjectState", diff --git a/metadata/src/main/resources/common/metadata/S3StreamObjectRecord.json b/metadata/src/main/resources/common/metadata/S3StreamObjectRecord.json index 95abda8988..ee3ed37f14 100644 --- a/metadata/src/main/resources/common/metadata/S3StreamObjectRecord.json +++ b/metadata/src/main/resources/common/metadata/S3StreamObjectRecord.json @@ -43,6 +43,12 @@ "type": "int64", "versions": "0+", "about": "The object id of the S3 object" + }, + { + "name": "ObjectSize", + "type": "int64", + "versions": "0+", + "about": "The size of the Stream object to commit" } ] } \ No newline at end of file diff --git a/metadata/src/main/resources/common/metadata/WALObjectRecord.json b/metadata/src/main/resources/common/metadata/WALObjectRecord.json index a5fb9a859e..df4fd0bdd4 100644 --- a/metadata/src/main/resources/common/metadata/WALObjectRecord.json +++ b/metadata/src/main/resources/common/metadata/WALObjectRecord.json @@ -32,6 +32,12 @@ "versions": "0+", "about": "The object id of the S3 object" }, + { + "name": "OrderId", + "type": "int64", + "versions": "0+", + "about": "The order id of the S3 object" + }, { "name": "StreamsIndex", "type": "[]StreamIndex", diff --git a/metadata/src/test/java/org/apache/kafka/controller/StreamControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/StreamControlManagerTest.java index 0f12ad17c4..bf86c11ec0 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/StreamControlManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/StreamControlManagerTest.java @@ -20,6 +20,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.mockito.ArgumentMatchers.anyList; import static org.mockito.ArgumentMatchers.anyLong; import java.util.Collections; @@ -30,6 +31,7 @@ import org.apache.kafka.common.message.CloseStreamResponseData; import org.apache.kafka.common.message.CommitWALObjectRequestData; import org.apache.kafka.common.message.CommitWALObjectRequestData.ObjectStreamRange; +import org.apache.kafka.common.message.CommitWALObjectRequestData.StreamObject; import org.apache.kafka.common.message.CommitWALObjectResponseData; import org.apache.kafka.common.message.CreateStreamRequestData; import org.apache.kafka.common.message.CreateStreamResponseData; @@ -42,7 +44,10 @@ import org.apache.kafka.common.metadata.MetadataRecordType; import org.apache.kafka.common.metadata.RangeRecord; import org.apache.kafka.common.metadata.RemoveRangeRecord; +import org.apache.kafka.common.metadata.RemoveS3StreamObjectRecord; import org.apache.kafka.common.metadata.RemoveS3StreamRecord; +import org.apache.kafka.common.metadata.RemoveWALObjectRecord; +import org.apache.kafka.common.metadata.S3StreamObjectRecord; import org.apache.kafka.common.metadata.S3StreamRecord; import org.apache.kafka.common.metadata.WALObjectRecord; import org.apache.kafka.common.protocol.ApiMessage; @@ -262,7 +267,7 @@ public void testBasicOpenCloseStream() { } @Test - public void testCommitWal() { + public void testCommitWalBasic() { Mockito.when(objectControlManager.commitObject(anyLong(), anyLong())).then(ink -> { long objectId = ink.getArgument(0); if (objectId == 1) { @@ -355,7 +360,197 @@ public void testCommitWal() { assertEquals(STREAM0, streamsOffset.streamsOffset().get(0).streamId()); assertEquals(0L, streamsOffset.streamsOffset().get(0).startOffset()); assertEquals(300L, streamsOffset.streamsOffset().get(0).endOffset()); + } + private void createAndOpenStream(int brokerId, long epoch) { + CreateStreamRequestData request0 = new CreateStreamRequestData(); + ControllerResult result0 = manager.createStream(request0); + replay(manager, result0.records()); + long streamId = result0.response().streamId(); + ControllerResult result1 = manager.openStream( + new OpenStreamRequestData().setStreamId(streamId).setStreamEpoch(epoch).setBrokerId(brokerId)); + replay(manager, result1.records()); + } + + @Test + public void testCommitWalCompacted() { + Mockito.when(objectControlManager.commitObject(anyLong(), anyLong())).thenReturn(ControllerResult.of(Collections.emptyList(), true)); + Mockito.when(objectControlManager.markDestroyObjects(anyList())).thenReturn(ControllerResult.of(Collections.emptyList(), true)); + + // 1. create and open stream_0 and stream_1 + createAndOpenStream(BROKER0, EPOCH0); + createAndOpenStream(BROKER0, EPOCH0); + + // 2. commit first level wal object of stream_0 and stream_1 + List streamRanges0 = List.of( + new ObjectStreamRange() + .setStreamId(STREAM0) + .setStreamEpoch(EPOCH0) + .setStartOffset(0L) + .setEndOffset(100L), + new ObjectStreamRange() + .setStreamId(STREAM1) + .setStreamEpoch(EPOCH0) + .setStartOffset(0L) + .setEndOffset(200L)); + CommitWALObjectRequestData commitRequest0 = new CommitWALObjectRequestData() + .setObjectId(0L) + .setOrderId(0L) + .setBrokerId(BROKER0) + .setObjectSize(999) + .setObjectStreamRanges(streamRanges0); + ControllerResult result4 = manager.commitWALObject(commitRequest0); + assertEquals(Errors.NONE.code(), result4.response().errorCode()); + replay(manager, result4.records()); + + // 3. fetch range end offset + GetStreamsOffsetRequestData request = new GetStreamsOffsetRequestData() + .setStreamIds(List.of(STREAM0, STREAM1)); + GetStreamsOffsetResponseData streamsOffset = manager.getStreamsOffset(request); + assertEquals(2, streamsOffset.streamsOffset().size()); + assertEquals(STREAM0, streamsOffset.streamsOffset().get(0).streamId()); + assertEquals(0L, streamsOffset.streamsOffset().get(0).startOffset()); + assertEquals(100L, streamsOffset.streamsOffset().get(0).endOffset()); + assertEquals(STREAM1, streamsOffset.streamsOffset().get(1).streamId()); + assertEquals(0L, streamsOffset.streamsOffset().get(1).startOffset()); + assertEquals(200L, streamsOffset.streamsOffset().get(1).endOffset()); + + // 4. keep committing first level object of stream_0 and stream_1 + List streamRanges1 = List.of( + new ObjectStreamRange() + .setStreamId(STREAM0) + .setStreamEpoch(EPOCH0) + .setStartOffset(100L) + .setEndOffset(200L), + new ObjectStreamRange() + .setStreamId(STREAM1) + .setStreamEpoch(EPOCH0) + .setStartOffset(200L) + .setEndOffset(300L)); + CommitWALObjectRequestData commitRequest1 = new CommitWALObjectRequestData() + .setObjectId(1L) + .setOrderId(1L) + .setBrokerId(BROKER0) + .setObjectSize(999) + .setObjectStreamRanges(streamRanges1); + ControllerResult result5 = manager.commitWALObject(commitRequest1); + assertEquals(Errors.NONE.code(), result5.response().errorCode()); + replay(manager, result5.records()); + + // 5. fetch range end offset + streamsOffset = manager.getStreamsOffset(request); + assertEquals(2, streamsOffset.streamsOffset().size()); + assertEquals(STREAM0, streamsOffset.streamsOffset().get(0).streamId()); + assertEquals(0L, streamsOffset.streamsOffset().get(0).startOffset()); + assertEquals(200L, streamsOffset.streamsOffset().get(0).endOffset()); + assertEquals(STREAM1, streamsOffset.streamsOffset().get(1).streamId()); + assertEquals(0L, streamsOffset.streamsOffset().get(1).startOffset()); + assertEquals(300L, streamsOffset.streamsOffset().get(1).endOffset()); + + // 6. commit a second level wal object which compact wal_0 and wal_1 + List streamRanges2 = List.of( + new ObjectStreamRange() + .setStreamId(STREAM0) + .setStreamEpoch(EPOCH0) + .setStartOffset(0L) + .setEndOffset(200L), + new ObjectStreamRange() + .setStreamId(STREAM1) + .setStreamEpoch(EPOCH0) + .setStartOffset(0L) + .setEndOffset(300L)); + CommitWALObjectRequestData commitRequest2 = new CommitWALObjectRequestData() + .setObjectId(2L) + .setOrderId(0L) + .setBrokerId(BROKER0) + .setObjectSize(999) + .setObjectStreamRanges(streamRanges2) + .setCompactedObjectIds(List.of(0L, 1L)); + ControllerResult result6 = manager.commitWALObject(commitRequest2); + assertEquals(Errors.NONE.code(), result6.response().errorCode()); + replay(manager, result6.records()); + + // 7. fetch range end offset + streamsOffset = manager.getStreamsOffset(request); + assertEquals(2, streamsOffset.streamsOffset().size()); + assertEquals(STREAM0, streamsOffset.streamsOffset().get(0).streamId()); + assertEquals(0L, streamsOffset.streamsOffset().get(0).startOffset()); + assertEquals(200L, streamsOffset.streamsOffset().get(0).endOffset()); + assertEquals(STREAM1, streamsOffset.streamsOffset().get(1).streamId()); + assertEquals(0L, streamsOffset.streamsOffset().get(1).startOffset()); + assertEquals(300L, streamsOffset.streamsOffset().get(1).endOffset()); + + // 8. verify compacted wal objects is removed + assertEquals(1, manager.brokersMetadata().get(BROKER0).walObjects().size()); + assertEquals(2, manager.brokersMetadata().get(BROKER0).walObjects().get(2L).objectId()); + assertEquals(0, manager.brokersMetadata().get(BROKER0).walObjects().get(2L).orderId()); + } + + @Test + public void testCommitWalWithStreamObject() { + Mockito.when(objectControlManager.commitObject(anyLong(), anyLong())).thenReturn(ControllerResult.of(Collections.emptyList(), true)); + Mockito.when(objectControlManager.markDestroyObjects(anyList())).thenReturn(ControllerResult.of(Collections.emptyList(), true)); + + // 1. create and open stream_0 and stream_1 + createAndOpenStream(BROKER0, EPOCH0); + createAndOpenStream(BROKER0, EPOCH0); + + // 2. commit a wal with stream_0 and a stream object with stream_1 that is split out from wal + List streamRanges0 = List.of( + new ObjectStreamRange() + .setStreamId(STREAM0) + .setStreamEpoch(EPOCH0) + .setStartOffset(0L) + .setEndOffset(100L)); + CommitWALObjectRequestData commitRequest0 = new CommitWALObjectRequestData() + .setObjectId(0L) + .setOrderId(0L) + .setBrokerId(BROKER0) + .setObjectSize(999) + .setObjectStreamRanges(streamRanges0) + .setStreamObjects(List.of( + new StreamObject() + .setStreamId(STREAM1) + .setObjectId(1L) + .setObjectSize(999) + .setStartOffset(0L) + .setEndOffset(200L) + )); + ControllerResult result4 = manager.commitWALObject(commitRequest0); + assertEquals(Errors.NONE.code(), result4.response().errorCode()); + replay(manager, result4.records()); + + // 3. fetch range end offset + GetStreamsOffsetRequestData request = new GetStreamsOffsetRequestData() + .setStreamIds(List.of(STREAM0, STREAM1)); + GetStreamsOffsetResponseData streamsOffset = manager.getStreamsOffset(request); + assertEquals(2, streamsOffset.streamsOffset().size()); + assertEquals(STREAM0, streamsOffset.streamsOffset().get(0).streamId()); + assertEquals(0L, streamsOffset.streamsOffset().get(0).startOffset()); + assertEquals(100L, streamsOffset.streamsOffset().get(0).endOffset()); + assertEquals(STREAM1, streamsOffset.streamsOffset().get(1).streamId()); + assertEquals(0L, streamsOffset.streamsOffset().get(1).startOffset()); + assertEquals(200L, streamsOffset.streamsOffset().get(1).endOffset()); + + // 4. verify stream object is added + assertEquals(1, manager.streamsMetadata().get(STREAM1).streamObjects().size()); + } + + private void commitFirstLevelWalObject(long objectId, long orderId, long streamId, long startOffset, long endOffset, long epoch, int brokerId) { + List streamRanges0 = List.of(new ObjectStreamRange() + .setStreamId(streamId) + .setStreamEpoch(epoch) + .setStartOffset(startOffset) + .setEndOffset(endOffset)); + CommitWALObjectRequestData commitRequest0 = new CommitWALObjectRequestData() + .setObjectId(objectId) + .setOrderId(orderId) + .setBrokerId(brokerId) + .setObjectSize(999) + .setObjectStreamRanges(streamRanges0); + ControllerResult result = manager.commitWALObject(commitRequest0); + assertEquals(Errors.NONE.code(), result.response().errorCode()); + replay(manager, result.records()); } private void replay(StreamControlManager manager, List records) { @@ -385,6 +580,15 @@ private void replay(StreamControlManager manager, List rec case WALOBJECT_RECORD: manager.replay((WALObjectRecord) message); break; + case REMOVE_WALOBJECT_RECORD: + manager.replay((RemoveWALObjectRecord) message); + break; + case S3_STREAM_OBJECT_RECORD: + manager.replay((S3StreamObjectRecord) message); + break; + case REMOVE_S3_OBJECT_RECORD: + manager.replay((RemoveS3StreamObjectRecord) message); + break; default: throw new IllegalStateException("Unknown metadata record type " + type); } diff --git a/metadata/src/test/java/org/apache/kafka/image/S3StreamMetadataImageTest.java b/metadata/src/test/java/org/apache/kafka/image/S3StreamMetadataImageTest.java index 4c2ed0ab32..91d4155ff6 100644 --- a/metadata/src/test/java/org/apache/kafka/image/S3StreamMetadataImageTest.java +++ b/metadata/src/test/java/org/apache/kafka/image/S3StreamMetadataImageTest.java @@ -152,8 +152,8 @@ public void testStreamObjects() { // verify delta and check image's write S3StreamMetadataImage image1 = new S3StreamMetadataImage( STREAM0, 0L, 0L, Map.of(), Map.of( - 0L, new S3StreamObject(0L, STREAM0, 0L, 100L), - 1L, new S3StreamObject(1L, STREAM0, 100L, 200L))); + 0L, new S3StreamObject(0L, 999, STREAM0, 0L, 100L), + 1L, new S3StreamObject(1L, 999, STREAM0, 100L, 200L))); assertEquals(image1, delta0.apply()); testToImageAndBack(image1); @@ -166,7 +166,7 @@ public void testStreamObjects() { // verify delta and check image's write S3StreamMetadataImage image2 = new S3StreamMetadataImage( STREAM0, 0L, 0L, Map.of(), Map.of( - 1L, new S3StreamObject(1L, STREAM0, 100L, 200L))); + 1L, new S3StreamObject(1L, 999, STREAM0, 100L, 200L))); assertEquals(image2, delta1.apply()); testToImageAndBack(image2); } diff --git a/metadata/src/test/java/org/apache/kafka/image/S3StreamsMetadataImageTest.java b/metadata/src/test/java/org/apache/kafka/image/S3StreamsMetadataImageTest.java index f3c9f6052a..19b58030ce 100644 --- a/metadata/src/test/java/org/apache/kafka/image/S3StreamsMetadataImageTest.java +++ b/metadata/src/test/java/org/apache/kafka/image/S3StreamsMetadataImageTest.java @@ -117,9 +117,9 @@ public void testGetObjects() { 3, new RangeMetadata(STREAM0, 3L, 3, 420L, 500L, BROKER1), 4, new RangeMetadata(STREAM0, 4L, 4, 500L, 600L, BROKER0)); Map streamObjects = Map.of( - 8L, new S3StreamObject(8, STREAM0, 10L, 100L), - 9L, new S3StreamObject(9, STREAM0, 200L, 300L), - 10L, new S3StreamObject(10, STREAM0, 300L, 400L)); + 8L, new S3StreamObject(8, GB, STREAM0, 10L, 100L), + 9L, new S3StreamObject(9, GB, STREAM0, 200L, 300L), + 10L, new S3StreamObject(10, GB, STREAM0, 300L, 400L)); S3StreamMetadataImage streamImage = new S3StreamMetadataImage(STREAM0, 4L, 10, ranges, streamObjects); S3StreamsMetadataImage streamsImage = new S3StreamsMetadataImage(STREAM0, Map.of(STREAM0, streamImage), Map.of(BROKER0, broker0WALMetadataImage, BROKER1, broker1WALMetadataImage));