From 191281cb46b5679b5bf74bcc7b25b104cd0331f8 Mon Sep 17 00:00:00 2001 From: TheR1sing3un Date: Mon, 21 Aug 2023 15:14:45 +0800 Subject: [PATCH] feat(s3): add expired time for object 1. add expired time for object Signed-off-by: TheR1sing3un --- .../stream/StreamControlManager.java | 2 - .../kafka/metadata/stream/S3Object.java | 66 ++++++++++++------- .../kafka/metadata/stream/S3StreamObject.java | 24 +++---- .../kafka/metadata/stream/S3WALObject.java | 24 +++---- .../common/metadata/S3StreamObjectRecord.json | 14 ++-- .../common/metadata/WALObjectRecord.json | 14 ++-- .../image/S3StreamsMetadataImageTest.java | 18 ++--- 7 files changed, 96 insertions(+), 66 deletions(-) diff --git a/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java index 9d586e8bcb..7f0483717f 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java @@ -65,6 +65,4 @@ public StreamControlManager( this.brokersMetadata = new TimelineHashMap<>(snapshotRegistry, 0); } - - } diff --git a/metadata/src/main/java/org/apache/kafka/metadata/stream/S3Object.java b/metadata/src/main/java/org/apache/kafka/metadata/stream/S3Object.java index c50d0fa33d..dd85f6dab1 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/stream/S3Object.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/stream/S3Object.java @@ -31,11 +31,25 @@ public abstract class S3Object implements Comparable { protected Optional objectAddress = Optional.empty(); - protected Optional applyTimeInMs = Optional.empty(); - - protected Optional createTimeInMs = Optional.empty(); - - protected Optional destroyTimeInMs = Optional.empty(); + /** + * The time when broker apply the object + */ + protected Optional appliedTimeInMs = Optional.empty(); + + /** + * The time when the object will be expired if it is not be committed + */ + protected Optional expiredTimeInMs = Optional.empty(); + + /** + * The time when the object is committed + */ + protected Optional committedTimeInMs = Optional.empty(); + + /** + * The time when the object is destroyed + */ + protected Optional destroyedTimeInMs = Optional.empty(); protected S3ObjectState s3ObjectState = S3ObjectState.UNINITIALIZED; @@ -49,17 +63,19 @@ protected S3Object( final Long objectId, final Long objectSize, final String objectAddress, - final Long applyTimeInMs, - final Long createTimeInMs, - final Long destroyTimeInMs, + final Long appliedTimeInMs, + final Long expiredTimeInMs, + final Long committedTimeInMs, + final Long destroyedTimeInMs, final S3ObjectState s3ObjectState, final S3ObjectType objectType) { this.objectId = objectId; this.objectSize = Optional.of(objectSize); this.objectAddress = Optional.of(objectAddress); - this.applyTimeInMs = Optional.of(applyTimeInMs); - this.createTimeInMs = Optional.of(createTimeInMs); - this.destroyTimeInMs = Optional.of(destroyTimeInMs); + this.appliedTimeInMs = Optional.of(appliedTimeInMs); + this.expiredTimeInMs = Optional.of(expiredTimeInMs); + this.committedTimeInMs = Optional.of(committedTimeInMs); + this.destroyedTimeInMs = Optional.of(destroyedTimeInMs); this.objectType = objectType; this.s3ObjectState = s3ObjectState; } @@ -69,16 +85,16 @@ public void onApply() { throw new IllegalStateException("Object is not in UNINITIALIZED state"); } this.s3ObjectState = S3ObjectState.APPLIED; - this.applyTimeInMs = Optional.of(System.currentTimeMillis()); + this.appliedTimeInMs = Optional.of(System.currentTimeMillis()); } - public void onCreate(S3ObjectCreateContext createContext) { + public void onCreate(S3ObjectCommitContext createContext) { // TODO: decide fetch object metadata from S3 or let broker send it to controller if (this.s3ObjectState != S3ObjectState.APPLIED) { throw new IllegalStateException("Object is not in APPLIED state"); } this.s3ObjectState = S3ObjectState.CREATED; - this.createTimeInMs = Optional.of(createContext.createTimeInMs); + this.committedTimeInMs = Optional.of(createContext.committedTimeInMs); this.objectSize = Optional.of(createContext.objectSize); this.objectAddress = Optional.of(createContext.objectAddress); this.objectType = createContext.objectType; @@ -103,19 +119,19 @@ public S3ObjectType getObjectType() { return objectType; } - public class S3ObjectCreateContext { + public class S3ObjectCommitContext { - private final Long createTimeInMs; + private final Long committedTimeInMs; private final Long objectSize; private final String objectAddress; private final S3ObjectType objectType; - public S3ObjectCreateContext( - final Long createTimeInMs, + public S3ObjectCommitContext( + final Long committedTimeInMs, final Long objectSize, final String objectAddress, final S3ObjectType objectType) { - this.createTimeInMs = createTimeInMs; + this.committedTimeInMs = committedTimeInMs; this.objectSize = objectSize; this.objectAddress = objectAddress; this.objectType = objectType; @@ -156,16 +172,16 @@ public Optional getObjectAddress() { return objectAddress; } - public Optional getApplyTimeInMs() { - return applyTimeInMs; + public Optional getAppliedTimeInMs() { + return appliedTimeInMs; } - public Optional getCreateTimeInMs() { - return createTimeInMs; + public Optional getCommittedTimeInMs() { + return committedTimeInMs; } - public Optional getDestroyTimeInMs() { - return destroyTimeInMs; + public Optional getDestroyedTimeInMs() { + return destroyedTimeInMs; } public S3ObjectState getS3ObjectState() { diff --git a/metadata/src/main/java/org/apache/kafka/metadata/stream/S3StreamObject.java b/metadata/src/main/java/org/apache/kafka/metadata/stream/S3StreamObject.java index 6f93c6536b..218bf44702 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/stream/S3StreamObject.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/stream/S3StreamObject.java @@ -30,12 +30,12 @@ public S3StreamObject(final Long objectId) { } @Override - public void onCreate(S3ObjectCreateContext createContext) { + public void onCreate(S3ObjectCommitContext createContext) { super.onCreate(createContext); - if (!(createContext instanceof StreamObjectCreateContext)) { + if (!(createContext instanceof StreamObjectCommitContext)) { throw new IllegalArgumentException(); } - this.streamIndex = ((StreamObjectCreateContext) createContext).streamIndex; + this.streamIndex = ((StreamObjectCommitContext) createContext).streamIndex; } @Override @@ -49,11 +49,11 @@ public int compareTo(S3Object o) { return res == 0 ? this.streamIndex.getStartOffset().compareTo(s3StreamObject.streamIndex.getStartOffset()) : res; } - class StreamObjectCreateContext extends S3ObjectCreateContext { + class StreamObjectCommitContext extends S3ObjectCommitContext { private final S3ObjectStreamIndex streamIndex; - public StreamObjectCreateContext( + public StreamObjectCommitContext( final Long createTimeInMs, final Long objectSize, final String objectAddress, @@ -74,9 +74,10 @@ public ApiMessageAndVersion toRecord() { .setStreamId(streamIndex.getStreamId()) .setObjectState((byte) s3ObjectState.ordinal()) .setObjectType((byte) objectType.ordinal()) - .setApplyTimeInMs(applyTimeInMs.get()) - .setCreateTimeInMs(createTimeInMs.get()) - .setDestroyTimeInMs(destroyTimeInMs.get()) + .setAppliedTimeInMs(appliedTimeInMs.get()) + .setExpiredTimeInMs(expiredTimeInMs.get()) + .setCommittedTimeInMs(committedTimeInMs.get()) + .setDestroyedTimeInMs(destroyedTimeInMs.get()) .setObjectSize(objectSize.get()) .setStartOffset(streamIndex.getStartOffset()) .setEndOffset(streamIndex.getEndOffset()), (short) 0); @@ -86,9 +87,10 @@ public static S3StreamObject of(S3StreamObjectRecord record) { S3StreamObject s3StreamObject = new S3StreamObject(record.objectId()); s3StreamObject.objectType = S3ObjectType.fromByte(record.objectType()); s3StreamObject.s3ObjectState = S3ObjectState.fromByte(record.objectState()); - s3StreamObject.applyTimeInMs = Optional.of(record.applyTimeInMs()); - s3StreamObject.createTimeInMs = Optional.of(record.createTimeInMs()); - s3StreamObject.destroyTimeInMs = Optional.of(record.destroyTimeInMs()); + s3StreamObject.appliedTimeInMs = Optional.of(record.appliedTimeInMs()); + s3StreamObject.expiredTimeInMs = Optional.of(record.expiredTimeInMs()); + s3StreamObject.committedTimeInMs = Optional.of(record.committedTimeInMs()); + s3StreamObject.destroyedTimeInMs = Optional.of(record.destroyedTimeInMs()); s3StreamObject.objectSize = Optional.of(record.objectSize()); s3StreamObject.streamIndex = new S3ObjectStreamIndex(record.streamId(), record.startOffset(), record.endOffset()); return s3StreamObject; diff --git a/metadata/src/main/java/org/apache/kafka/metadata/stream/S3WALObject.java b/metadata/src/main/java/org/apache/kafka/metadata/stream/S3WALObject.java index 13e48d2312..6871688630 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/stream/S3WALObject.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/stream/S3WALObject.java @@ -39,13 +39,14 @@ private S3WALObject( final Long objectSize, final String objectAddress, final Long applyTimeInMs, - final Long createTimeInMs, + final Long expiredTimeImMs, + final Long commitTimeInMs, final Long destroyTimeInMs, final S3ObjectState s3ObjectState, final S3ObjectType objectType, final Integer brokerId, final List streamsIndex) { - super(objectId, objectSize, objectAddress, applyTimeInMs, createTimeInMs, destroyTimeInMs, s3ObjectState, objectType); + super(objectId, objectSize, objectAddress, applyTimeInMs, expiredTimeImMs, commitTimeInMs, destroyTimeInMs, s3ObjectState, objectType); this.objectType = objectType; this.brokerId = brokerId; this.streamsIndex = streamsIndex.stream().collect( @@ -53,22 +54,22 @@ private S3WALObject( } @Override - public void onCreate(S3ObjectCreateContext createContext) { + public void onCreate(S3ObjectCommitContext createContext) { super.onCreate(createContext); - if (!(createContext instanceof WALObjectCreateContext)) { + if (!(createContext instanceof WALObjectCommitContext)) { throw new IllegalArgumentException(); } - WALObjectCreateContext walCreateContext = (WALObjectCreateContext) createContext; + WALObjectCommitContext walCreateContext = (WALObjectCommitContext) createContext; this.streamsIndex = walCreateContext.streamIndexList.stream().collect(Collectors.toMap(S3ObjectStreamIndex::getStreamId, index -> index)); this.brokerId = walCreateContext.brokerId; } - class WALObjectCreateContext extends S3ObjectCreateContext { + class WALObjectCommitContext extends S3ObjectCommitContext { private final List streamIndexList; private final Integer brokerId; - public WALObjectCreateContext( + public WALObjectCommitContext( final Long createTimeInMs, final Long objectSize, final String objectAddress, @@ -86,9 +87,10 @@ public ApiMessageAndVersion toRecord() { .setObjectId(objectId) .setObjectState((byte) s3ObjectState.ordinal()) .setObjectType((byte) objectType.ordinal()) - .setApplyTimeInMs(applyTimeInMs.get()) - .setCreateTimeInMs(createTimeInMs.get()) - .setDestroyTimeInMs(destroyTimeInMs.get()) + .setAppliedTimeInMs(appliedTimeInMs.get()) + .setExpiredTimeInMs(expiredTimeInMs.get()) + .setCommittedTimeInMs(committedTimeInMs.get()) + .setDestroyedTimeInMs(destroyedTimeInMs.get()) .setObjectSize(objectSize.get()) .setStreamsIndex( streamsIndex.values().stream() @@ -99,7 +101,7 @@ public ApiMessageAndVersion toRecord() { public static S3WALObject of(WALObjectRecord record) { S3WALObject s3WalObject = new S3WALObject( record.objectId(), record.objectSize(), null, - record.applyTimeInMs(), record.createTimeInMs(), record.destroyTimeInMs(), + record.appliedTimeInMs(), record.expiredTimeInMs(), record.committedTimeInMs(), record.destroyedTimeInMs(), S3ObjectState.fromByte(record.objectState()), S3ObjectType.fromByte(record.objectType()), record.brokerId(), record.streamsIndex().stream().map(S3ObjectStreamIndex::of).collect(Collectors.toList())); return s3WalObject; diff --git a/metadata/src/main/resources/common/metadata/S3StreamObjectRecord.json b/metadata/src/main/resources/common/metadata/S3StreamObjectRecord.json index 4ae99fb78d..30fea687a4 100644 --- a/metadata/src/main/resources/common/metadata/S3StreamObjectRecord.json +++ b/metadata/src/main/resources/common/metadata/S3StreamObjectRecord.json @@ -51,19 +51,25 @@ "about": "The object size of the S3 object" }, { - "name": "ApplyTimeInMs", + "name": "AppliedTimeInMs", "type": "int64", "versions": "0+", "about": "The object be applied timestamp" }, { - "name": "CreateTimeInMs", + "name": "ExpiredTimeInMs", "type": "int64", "versions": "0+", - "about": "The object be created timestamp" + "about": "The object be expired timestamp" }, { - "name": "DestroyTimeInMs", + "name": "CommittedTimeInMs", + "type": "int64", + "versions": "0+", + "about": "The object be committed timestamp" + }, + { + "name": "DestroyedTimeInMs", "type": "int64", "versions": "0+", "about": "The object be destroyed timestamp" diff --git a/metadata/src/main/resources/common/metadata/WALObjectRecord.json b/metadata/src/main/resources/common/metadata/WALObjectRecord.json index 51b0a2316a..b7715b6257 100644 --- a/metadata/src/main/resources/common/metadata/WALObjectRecord.json +++ b/metadata/src/main/resources/common/metadata/WALObjectRecord.json @@ -39,19 +39,25 @@ "about": "The object size of the S3 object" }, { - "name": "ApplyTimeInMs", + "name": "AppliedTimeInMs", "type": "int64", "versions": "0+", "about": "The object be applied timestamp" }, { - "name": "CreateTimeInMs", + "name": "ExpiredTimeInMs", "type": "int64", "versions": "0+", - "about": "The object be created timestamp" + "about": "The object be expired timestamp" }, { - "name": "DestroyTimeInMs", + "name": "CommittedTimeInMs", + "type": "int64", + "versions": "0+", + "about": "The object be committed timestamp" + }, + { + "name": "DestroyedTimeInMs", "type": "int64", "versions": "0+", "about": "The object be destroyed timestamp" diff --git a/metadata/src/test/java/org/apache/kafka/image/S3StreamsMetadataImageTest.java b/metadata/src/test/java/org/apache/kafka/image/S3StreamsMetadataImageTest.java index c79b390483..1a038b864a 100644 --- a/metadata/src/test/java/org/apache/kafka/image/S3StreamsMetadataImageTest.java +++ b/metadata/src/test/java/org/apache/kafka/image/S3StreamsMetadataImageTest.java @@ -141,19 +141,19 @@ public void testBasicChange() { WALObjectRecord walObjectRecord0 = new WALObjectRecord() .setBrokerId(brokerId0) .setObjectId(0L) - .setApplyTimeInMs(System.currentTimeMillis()) + .setAppliedTimeInMs(System.currentTimeMillis()) .setObjectType((byte) S3ObjectType.WAL_LOOSE.ordinal()) .setObjectState((byte) S3ObjectState.APPLIED.ordinal()); WALObjectRecord walObjectRecord1 = new WALObjectRecord() .setBrokerId(brokerId1) .setObjectId(1L) - .setApplyTimeInMs(System.currentTimeMillis()) + .setAppliedTimeInMs(System.currentTimeMillis()) .setObjectType((byte) S3ObjectType.WAL_LOOSE.ordinal()) .setObjectState((byte) S3ObjectState.APPLIED.ordinal()); WALObjectRecord walObjectRecord2 = new WALObjectRecord() .setBrokerId(brokerId1) .setObjectId(2L) - .setApplyTimeInMs(System.currentTimeMillis()) + .setAppliedTimeInMs(System.currentTimeMillis()) .setObjectType((byte) S3ObjectType.WAL_LOOSE.ordinal()) .setObjectState((byte) S3ObjectState.APPLIED.ordinal()); records.clear(); @@ -197,7 +197,7 @@ public void testBasicChange() { .setBrokerId(brokerId1) .setObjectId(1L) .setObjectSize(WAL_LOOSE_SIZE) - .setCreateTimeInMs(System.currentTimeMillis()) + .setCommittedTimeInMs(System.currentTimeMillis()) .setObjectType((byte) S3ObjectType.WAL_LOOSE.ordinal()) .setStreamsIndex(streamIndicesInWALObject1.stream().map(S3ObjectStreamIndex::toRecordStreamIndex).collect( Collectors.toList())) @@ -211,7 +211,7 @@ public void testBasicChange() { .setBrokerId(brokerId1) .setObjectId(2L) .setObjectSize(WAL_LOOSE_SIZE) - .setCreateTimeInMs(System.currentTimeMillis()) + .setCommittedTimeInMs(System.currentTimeMillis()) .setObjectType((byte) S3ObjectType.WAL_LOOSE.ordinal()) .setStreamsIndex(streamIndicesInWALObject2.stream().map(S3ObjectStreamIndex::toRecordStreamIndex).collect( Collectors.toList())) @@ -284,9 +284,9 @@ public void testBasicChange() { .setObjectId(3L) .setBrokerId(brokerId1) .setObjectType((byte) S3ObjectType.WAL_MINOR.ordinal()) - .setCreateTimeInMs(System.currentTimeMillis()) + .setCommittedTimeInMs(System.currentTimeMillis()) .setObjectState((byte) S3ObjectState.CREATED.ordinal()) - .setApplyTimeInMs(System.currentTimeMillis()) + .setAppliedTimeInMs(System.currentTimeMillis()) .setObjectSize(WAL_MINOR_COMPACT_SIZE) .setStreamsIndex(streamIndicesInWALObject3.stream().map(S3ObjectStreamIndex::toRecordStreamIndex).collect( Collectors.toList())); @@ -335,7 +335,7 @@ public void testBasicChange() { .setStreamId(streamId0) .setObjectSize(STREAM_OBJECT_SIZE) .setObjectType((byte) S3ObjectType.STREAM.ordinal()) - .setCreateTimeInMs(System.currentTimeMillis()) + .setCommittedTimeInMs(System.currentTimeMillis()) .setStartOffset(s3ObjectStreamIndex4.getStartOffset()) .setEndOffset(s3ObjectStreamIndex4.getEndOffset()); S3StreamObjectRecord streamObjectRecord5 = new S3StreamObjectRecord() @@ -343,7 +343,7 @@ public void testBasicChange() { .setStreamId(streamId1) .setObjectSize(STREAM_OBJECT_SIZE) .setObjectType((byte) S3ObjectType.STREAM.ordinal()) - .setCreateTimeInMs(System.currentTimeMillis()) + .setCommittedTimeInMs(System.currentTimeMillis()) .setStartOffset(s3ObjectStreamIndex5.getStartOffset()) .setEndOffset(s3ObjectStreamIndex5.getEndOffset()); RemoveWALObjectRecord removeWALObjectRecord3 = new RemoveWALObjectRecord()