diff --git a/metadata/src/main/java/org/apache/kafka/controller/stream/S3ObjectControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/stream/S3ObjectControlManager.java index 21d06533f4..de07c2cb59 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/stream/S3ObjectControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/stream/S3ObjectControlManager.java @@ -44,12 +44,12 @@ public class S3ObjectControlManager { private final S3Config config; /** - * The objectId of the next object to be applied. (start from 0) + * The objectId of the next object to be prepared. (start from 0) */ - private Long nextApplyObjectId = 0L; + private Long nextAssignedObjectId = 0L; // TODO: add timer task to periodically check if there are objects to be destroyed or expired - private final Queue appliedObjects; + private final Queue preparedObjects; private final Queue markDestroyedObjects; public S3ObjectControlManager( @@ -62,12 +62,12 @@ public S3ObjectControlManager( this.clusterId = clusterId; this.config = config; this.objectsMetadata = new TimelineHashMap<>(snapshotRegistry, 0); - this.appliedObjects = new LinkedList<>(); + this.preparedObjects = new LinkedList<>(); this.markDestroyedObjects = new LinkedList<>(); } - - public Long appliedObjectNum() { - return nextApplyObjectId; + + public Long nextAssignedObjectId() { + return nextAssignedObjectId; } public void replay(S3ObjectRecord record) { @@ -76,6 +76,7 @@ public void replay(S3ObjectRecord record) { S3Object object = new S3Object(record.objectId(), record.objectSize(), objectKey, record.appliedTimeInMs(), record.expiredTimeInMs(), record.committedTimeInMs(), record.destroyedTimeInMs(), S3ObjectState.fromByte(record.objectState())); objectsMetadata.put(record.objectId(), object); + nextAssignedObjectId = Math.max(nextAssignedObjectId, record.objectId() + 1); } public void replay(RemoveS3ObjectRecord record) { diff --git a/metadata/src/main/java/org/apache/kafka/metadata/stream/S3Object.java b/metadata/src/main/java/org/apache/kafka/metadata/stream/S3Object.java index 81c4d25f47..fd91dd6939 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/stream/S3Object.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/stream/S3Object.java @@ -82,13 +82,13 @@ public void onApply() { if (this.s3ObjectState != S3ObjectState.UNINITIALIZED) { throw new IllegalStateException("Object is not in UNINITIALIZED state"); } - this.s3ObjectState = S3ObjectState.APPLIED; + this.s3ObjectState = S3ObjectState.PREPARED; this.appliedTimeInMs = Optional.of(System.currentTimeMillis()); } public void onCreate(S3ObjectCommitContext createContext) { // TODO: decide fetch object metadata from S3 or let broker send it to controller - if (this.s3ObjectState != S3ObjectState.APPLIED) { + if (this.s3ObjectState != S3ObjectState.PREPARED) { throw new IllegalStateException("Object is not in APPLIED state"); } this.s3ObjectState = S3ObjectState.COMMITTED; diff --git a/metadata/src/main/java/org/apache/kafka/metadata/stream/S3ObjectState.java b/metadata/src/main/java/org/apache/kafka/metadata/stream/S3ObjectState.java index cdffc10f49..99e3bff9ab 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/stream/S3ObjectState.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/stream/S3ObjectState.java @@ -19,7 +19,7 @@ public enum S3ObjectState { UNINITIALIZED, - APPLIED, + PREPARED, COMMITTED, MARK_DESTROYED, DESTROYED; diff --git a/metadata/src/test/java/org/apache/kafka/image/S3ObjectsImageTest.java b/metadata/src/test/java/org/apache/kafka/image/S3ObjectsImageTest.java index d4422bd0d2..d0616cf541 100644 --- a/metadata/src/test/java/org/apache/kafka/image/S3ObjectsImageTest.java +++ b/metadata/src/test/java/org/apache/kafka/image/S3ObjectsImageTest.java @@ -47,7 +47,7 @@ public class S3ObjectsImageTest { static { Map map = new HashMap<>(); for (int i = 0; i < 4; i++) { - SimplifiedS3Object object = new SimplifiedS3Object(i, S3ObjectState.APPLIED); + SimplifiedS3Object object = new SimplifiedS3Object(i, S3ObjectState.PREPARED); map.put(object.objectId(), object); } IMAGE1 = new S3ObjectsImage(map); @@ -69,7 +69,7 @@ public class S3ObjectsImageTest { .setObjectId(3L), (short) 0)); DELTA1_RECORDS.add(new ApiMessageAndVersion(new S3ObjectRecord(). setObjectId(4L). - setObjectState((byte) S3ObjectState.APPLIED.ordinal()), (short) 0)); + setObjectState((byte) S3ObjectState.PREPARED.ordinal()), (short) 0)); DELTA1 = new S3ObjectsDelta(IMAGE1); RecordTestUtils.replayAll(DELTA1, DELTA1_RECORDS); @@ -77,7 +77,7 @@ public class S3ObjectsImageTest { map2.put(0L, new SimplifiedS3Object(0L, S3ObjectState.COMMITTED)); map2.put(1L, new SimplifiedS3Object(1L, S3ObjectState.COMMITTED)); map2.put(2L, new SimplifiedS3Object(2L, S3ObjectState.MARK_DESTROYED)); - map2.put(4L, new SimplifiedS3Object(4L, S3ObjectState.APPLIED)); + map2.put(4L, new SimplifiedS3Object(4L, S3ObjectState.PREPARED)); IMAGE2 = new S3ObjectsImage(map2); }