newObjectsMetadata = new HashMap<>(image.objectsMetadata());
+ // put all new changed objects
+ newObjectsMetadata.putAll(changedObjects);
// remove all removed objects
removedObjectIds.forEach(newObjectsMetadata::remove);
- return new S3ObjectsImage(newObjectsMetadata);
+ return new S3ObjectsImage(currentAssignedObjectId, newObjectsMetadata);
}
}
diff --git a/metadata/src/main/java/org/apache/kafka/image/S3ObjectsImage.java b/metadata/src/main/java/org/apache/kafka/image/S3ObjectsImage.java
index c7339f6b0b..530793c744 100644
--- a/metadata/src/main/java/org/apache/kafka/image/S3ObjectsImage.java
+++ b/metadata/src/main/java/org/apache/kafka/image/S3ObjectsImage.java
@@ -20,31 +20,44 @@
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
+import org.apache.kafka.common.metadata.AssignedS3ObjectIdRecord;
import org.apache.kafka.image.writer.ImageWriter;
import org.apache.kafka.image.writer.ImageWriterOptions;
-import org.apache.kafka.metadata.stream.SimplifiedS3Object;
+import org.apache.kafka.metadata.stream.S3Object;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
/**
* Represents the S3 objects in the metadata image.
- *
+ *
* This class is thread-safe.
*/
public final class S3ObjectsImage {
+
public static final S3ObjectsImage EMPTY =
- new S3ObjectsImage(Collections.emptyMap());
+ new S3ObjectsImage(-1, Collections.emptyMap());
+
+ private long nextAssignedObjectId;
- private final Map objectsMetadata;
+ private final Map objectsMetadata;
- public S3ObjectsImage(final Map objectsMetadata) {
+ public S3ObjectsImage(long assignedObjectId, final Map objectsMetadata) {
+ this.nextAssignedObjectId = assignedObjectId + 1;
this.objectsMetadata = objectsMetadata;
}
- public Map objectsMetadata() {
+ public Map objectsMetadata() {
return objectsMetadata;
}
+ public long nextAssignedObjectId() {
+ return nextAssignedObjectId;
+ }
+
public void write(ImageWriter writer, ImageWriterOptions options) {
- objectsMetadata.values().stream().map(SimplifiedS3Object::toRecord).forEach(writer::write);
+ writer.write(
+ new ApiMessageAndVersion(
+ new AssignedS3ObjectIdRecord().setAssignedS3ObjectId(nextAssignedObjectId - 1), (short) 0));
+ objectsMetadata.values().stream().map(S3Object::toRecord).forEach(writer::write);
}
@Override
@@ -56,11 +69,12 @@ public boolean equals(Object o) {
return false;
}
S3ObjectsImage that = (S3ObjectsImage) o;
- return Objects.equals(objectsMetadata, that.objectsMetadata);
+ return this.nextAssignedObjectId == that.nextAssignedObjectId &&
+ objectsMetadata.equals(that.objectsMetadata);
}
@Override
public int hashCode() {
- return Objects.hash(objectsMetadata);
+ return Objects.hash(nextAssignedObjectId, objectsMetadata);
}
}
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/stream/S3Object.java b/metadata/src/main/java/org/apache/kafka/metadata/stream/S3Object.java
index dafcfc4f90..97c3ae1ce1 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/stream/S3Object.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/stream/S3Object.java
@@ -77,65 +77,22 @@ public S3Object(
this.s3ObjectState = s3ObjectState;
}
- public void onApply() {
- if (this.s3ObjectState != S3ObjectState.UNINITIALIZED) {
- throw new IllegalStateException("Object is not in UNINITIALIZED state");
- }
- this.s3ObjectState = S3ObjectState.PREPARED;
- this.preparedTimeInMs = System.currentTimeMillis();
- }
-
- public void onCreate(S3ObjectCommitContext createContext) {
- // TODO: decide fetch object metadata from S3 or let broker send it to controller
- if (this.s3ObjectState != S3ObjectState.PREPARED) {
- throw new IllegalStateException("Object is not in APPLIED state");
- }
- this.s3ObjectState = S3ObjectState.COMMITTED;
- this.committedTimeInMs = createContext.committedTimeInMs;
- this.objectSize = createContext.objectSize;
- this.objectKey = createContext.objectAddress;
- }
-
- public void onMarkDestroy() {
- if (this.s3ObjectState != S3ObjectState.COMMITTED) {
- throw new IllegalStateException("Object is not in CREATED state");
- }
- this.s3ObjectState = S3ObjectState.MARK_DESTROYED;
- }
-
- public void onDestroy() {
- if (this.s3ObjectState != S3ObjectState.COMMITTED) {
- throw new IllegalStateException("Object is not in CREATED state");
- }
- // TODO: trigger destroy
-
- }
-
public ApiMessageAndVersion toRecord() {
return new ApiMessageAndVersion(new S3ObjectRecord()
.setObjectId(objectId)
.setObjectSize(objectSize)
- .setObjectState((byte) s3ObjectState.ordinal())
+ .setObjectState(s3ObjectState.toByte())
.setPreparedTimeInMs(preparedTimeInMs)
.setExpiredTimeInMs(expiredTimeInMs)
.setCommittedTimeInMs(committedTimeInMs)
.setDestroyedTimeInMs(destroyedTimeInMs), (short) 0);
}
- static public class S3ObjectCommitContext {
-
- private final long committedTimeInMs;
- private final long objectSize;
- private final String objectAddress;
-
- public S3ObjectCommitContext(
- final long committedTimeInMs,
- final long objectSize,
- final String objectAddress) {
- this.committedTimeInMs = committedTimeInMs;
- this.objectSize = objectSize;
- this.objectAddress = objectAddress;
- }
+ public static S3Object of(S3ObjectRecord record) {
+ return new S3Object(
+ record.objectId(), record.objectSize(), null,
+ record.preparedTimeInMs(), record.expiredTimeInMs(), record.committedTimeInMs(), record.destroyedTimeInMs(),
+ S3ObjectState.fromByte(record.objectState()));
}
@Override
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/stream/S3ObjectState.java b/metadata/src/main/java/org/apache/kafka/metadata/stream/S3ObjectState.java
index 99e3bff9ab..3ca79b8079 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/stream/S3ObjectState.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/stream/S3ObjectState.java
@@ -24,6 +24,10 @@ public enum S3ObjectState {
MARK_DESTROYED,
DESTROYED;
+ public byte toByte() {
+ return (byte) ordinal();
+ }
+
public static S3ObjectState fromByte(Byte b) {
int ordinal = b.intValue();
if (ordinal < 0 || ordinal >= values().length) {
diff --git a/metadata/src/main/resources/common/metadata/AssignedS3ObjectIdRecord.json b/metadata/src/main/resources/common/metadata/AssignedS3ObjectIdRecord.json
new file mode 100644
index 0000000000..a7bc582411
--- /dev/null
+++ b/metadata/src/main/resources/common/metadata/AssignedS3ObjectIdRecord.json
@@ -0,0 +1,30 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+ "apiKey": 512,
+ "type": "metadata",
+ "name": "AssignedS3ObjectIdRecord",
+ "validVersions": "0",
+ "flexibleVersions": "0+",
+ "fields": [
+ {
+ "name": "AssignedS3ObjectId",
+ "type": "int64",
+ "versions": "0+",
+ "about": "The last assigned S3 object's id"
+ }
+ ]
+}
\ No newline at end of file
diff --git a/metadata/src/main/resources/common/metadata/AssignedStreamIdRecord.json b/metadata/src/main/resources/common/metadata/AssignedStreamIdRecord.json
new file mode 100644
index 0000000000..704039b7d6
--- /dev/null
+++ b/metadata/src/main/resources/common/metadata/AssignedStreamIdRecord.json
@@ -0,0 +1,30 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+ "apiKey": 511,
+ "type": "metadata",
+ "name": "AssignedStreamIdRecord",
+ "validVersions": "0",
+ "flexibleVersions": "0+",
+ "fields": [
+ {
+ "name": "AssignedStreamId",
+ "type": "int64",
+ "versions": "0+",
+ "about": "The last assigned stream's id"
+ }
+ ]
+}
\ No newline at end of file
diff --git a/metadata/src/main/resources/common/metadata/RangeRecord.json b/metadata/src/main/resources/common/metadata/RangeRecord.json
index 8721c65752..867713123a 100644
--- a/metadata/src/main/resources/common/metadata/RangeRecord.json
+++ b/metadata/src/main/resources/common/metadata/RangeRecord.json
@@ -14,7 +14,7 @@
// limitations under the License.
{
- "apiKey": 25,
+ "apiKey": 503,
"type": "metadata",
"name": "RangeRecord",
"validVersions": "0",
diff --git a/metadata/src/main/resources/common/metadata/RemoveRangeRecord.json b/metadata/src/main/resources/common/metadata/RemoveRangeRecord.json
index dce6c5243c..43fbe99954 100644
--- a/metadata/src/main/resources/common/metadata/RemoveRangeRecord.json
+++ b/metadata/src/main/resources/common/metadata/RemoveRangeRecord.json
@@ -14,7 +14,7 @@
// limitations under the License.
{
- "apiKey": 26,
+ "apiKey": 504,
"type": "metadata",
"name": "RemoveRangeRecord",
"validVersions": "0",
diff --git a/metadata/src/main/resources/common/metadata/RemoveS3ObjectRecord.json b/metadata/src/main/resources/common/metadata/RemoveS3ObjectRecord.json
index 465d40c49e..107212fa6b 100644
--- a/metadata/src/main/resources/common/metadata/RemoveS3ObjectRecord.json
+++ b/metadata/src/main/resources/common/metadata/RemoveS3ObjectRecord.json
@@ -14,7 +14,7 @@
// limitations under the License.
{
- "apiKey": 32,
+ "apiKey": 510,
"type": "metadata",
"name": "RemoveS3ObjectRecord",
"validVersions": "0",
diff --git a/metadata/src/main/resources/common/metadata/RemoveS3StreamObjectRecord.json b/metadata/src/main/resources/common/metadata/RemoveS3StreamObjectRecord.json
index 729a0fc472..217d2d092c 100644
--- a/metadata/src/main/resources/common/metadata/RemoveS3StreamObjectRecord.json
+++ b/metadata/src/main/resources/common/metadata/RemoveS3StreamObjectRecord.json
@@ -14,7 +14,7 @@
// limitations under the License.
{
- "apiKey": 28,
+ "apiKey": 506,
"type": "metadata",
"name": "RemoveS3StreamObjectRecord",
"validVersions": "0",
diff --git a/metadata/src/main/resources/common/metadata/RemoveS3StreamRecord.json b/metadata/src/main/resources/common/metadata/RemoveS3StreamRecord.json
index 30156373a7..c830a0efeb 100644
--- a/metadata/src/main/resources/common/metadata/RemoveS3StreamRecord.json
+++ b/metadata/src/main/resources/common/metadata/RemoveS3StreamRecord.json
@@ -14,7 +14,7 @@
// limitations under the License.
{
- "apiKey": 23,
+ "apiKey": 502,
"type": "metadata",
"name": "RemoveS3StreamRecord",
"validVersions": "0",
diff --git a/metadata/src/main/resources/common/metadata/RemoveWALObjectRecord.json b/metadata/src/main/resources/common/metadata/RemoveWALObjectRecord.json
index ae126bd42e..e953b5dca1 100644
--- a/metadata/src/main/resources/common/metadata/RemoveWALObjectRecord.json
+++ b/metadata/src/main/resources/common/metadata/RemoveWALObjectRecord.json
@@ -14,7 +14,7 @@
// limitations under the License.
{
- "apiKey": 30,
+ "apiKey": 508,
"type": "metadata",
"name": "RemoveWALObjectRecord",
"validVersions": "0",
diff --git a/metadata/src/main/resources/common/metadata/S3ObjectRecord.json b/metadata/src/main/resources/common/metadata/S3ObjectRecord.json
index 006af11595..85bb4cd18d 100644
--- a/metadata/src/main/resources/common/metadata/S3ObjectRecord.json
+++ b/metadata/src/main/resources/common/metadata/S3ObjectRecord.json
@@ -14,7 +14,7 @@
// limitations under the License.
{
- "apiKey": 31,
+ "apiKey": 509,
"type": "metadata",
"name": "S3ObjectRecord",
"validVersions": "0",
diff --git a/metadata/src/main/resources/common/metadata/S3StreamObjectRecord.json b/metadata/src/main/resources/common/metadata/S3StreamObjectRecord.json
index 2d55d4cf47..95abda8988 100644
--- a/metadata/src/main/resources/common/metadata/S3StreamObjectRecord.json
+++ b/metadata/src/main/resources/common/metadata/S3StreamObjectRecord.json
@@ -14,7 +14,7 @@
// limitations under the License.
{
- "apiKey": 27,
+ "apiKey": 505,
"type": "metadata",
"name": "S3StreamObjectRecord",
"validVersions": "0",
diff --git a/metadata/src/main/resources/common/metadata/S3StreamRecord.json b/metadata/src/main/resources/common/metadata/S3StreamRecord.json
index 20c5a3a903..fb07283b4a 100644
--- a/metadata/src/main/resources/common/metadata/S3StreamRecord.json
+++ b/metadata/src/main/resources/common/metadata/S3StreamRecord.json
@@ -14,7 +14,7 @@
// limitations under the License.
{
- "apiKey": 22,
+ "apiKey": 501,
"type": "metadata",
"name": "S3StreamRecord",
"validVersions": "0",
diff --git a/metadata/src/main/resources/common/metadata/WALObjectRecord.json b/metadata/src/main/resources/common/metadata/WALObjectRecord.json
index 2724b023a3..a5fb9a859e 100644
--- a/metadata/src/main/resources/common/metadata/WALObjectRecord.json
+++ b/metadata/src/main/resources/common/metadata/WALObjectRecord.json
@@ -14,7 +14,7 @@
// limitations under the License.
{
- "apiKey": 29,
+ "apiKey": 507,
"type": "metadata",
"name": "WALObjectRecord",
"validVersions": "0",
diff --git a/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderESTest.java b/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderESTest.java
index 57a6deb944..5302029f0a 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderESTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderESTest.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.kafka.controller;
import org.apache.kafka.common.Uuid;
diff --git a/metadata/src/test/java/org/apache/kafka/image/S3ObjectsImageTest.java b/metadata/src/test/java/org/apache/kafka/image/S3ObjectsImageTest.java
index d0616cf541..2bddf32457 100644
--- a/metadata/src/test/java/org/apache/kafka/image/S3ObjectsImageTest.java
+++ b/metadata/src/test/java/org/apache/kafka/image/S3ObjectsImageTest.java
@@ -23,19 +23,23 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import org.apache.kafka.common.metadata.AssignedS3ObjectIdRecord;
import org.apache.kafka.common.metadata.RemoveS3ObjectRecord;
import org.apache.kafka.common.metadata.S3ObjectRecord;
import org.apache.kafka.image.writer.ImageWriterOptions;
import org.apache.kafka.image.writer.RecordListWriter;
import org.apache.kafka.metadata.RecordTestUtils;
+import org.apache.kafka.metadata.stream.S3Object;
import org.apache.kafka.metadata.stream.S3ObjectState;
-import org.apache.kafka.metadata.stream.SimplifiedS3Object;
import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
-@Timeout(value = 40)
+@Timeout(40)
+@Tag("S3Unit")
public class S3ObjectsImageTest {
+
final static S3ObjectsImage IMAGE1;
final static List DELTA1_RECORDS;
@@ -45,12 +49,15 @@ public class S3ObjectsImageTest {
final static S3ObjectsImage IMAGE2;
static {
- Map map = new HashMap<>();
+ Map map = new HashMap<>();
for (int i = 0; i < 4; i++) {
- SimplifiedS3Object object = new SimplifiedS3Object(i, S3ObjectState.PREPARED);
- map.put(object.objectId(), object);
+ S3Object object = new S3Object(
+ i, -1, null,
+ -1, -1, -1, -1,
+ S3ObjectState.PREPARED);
+ map.put(object.getObjectId(), object);
}
- IMAGE1 = new S3ObjectsImage(map);
+ IMAGE1 = new S3ObjectsImage(3, map);
DELTA1_RECORDS = new ArrayList<>();
// try to update object0 and object1 to committed
// try to make object2 expired and mark it to be destroyed
@@ -67,19 +74,33 @@ public class S3ObjectsImageTest {
setObjectState((byte) S3ObjectState.MARK_DESTROYED.ordinal()), (short) 0));
DELTA1_RECORDS.add(new ApiMessageAndVersion(new RemoveS3ObjectRecord()
.setObjectId(3L), (short) 0));
+ DELTA1_RECORDS.add(new ApiMessageAndVersion(new AssignedS3ObjectIdRecord()
+ .setAssignedS3ObjectId(4L), (short) 0));
DELTA1_RECORDS.add(new ApiMessageAndVersion(new S3ObjectRecord().
setObjectId(4L).
setObjectState((byte) S3ObjectState.PREPARED.ordinal()), (short) 0));
DELTA1 = new S3ObjectsDelta(IMAGE1);
RecordTestUtils.replayAll(DELTA1, DELTA1_RECORDS);
- Map map2 = new HashMap<>();
- map2.put(0L, new SimplifiedS3Object(0L, S3ObjectState.COMMITTED));
- map2.put(1L, new SimplifiedS3Object(1L, S3ObjectState.COMMITTED));
- map2.put(2L, new SimplifiedS3Object(2L, S3ObjectState.MARK_DESTROYED));
- map2.put(4L, new SimplifiedS3Object(4L, S3ObjectState.PREPARED));
-
- IMAGE2 = new S3ObjectsImage(map2);
+ Map map2 = new HashMap<>();
+ map2.put(0L, new S3Object(
+ 0L, -1, null,
+ -1, -1, -1, -1,
+ S3ObjectState.COMMITTED));
+ map2.put(1L, new S3Object(
+ 1L, -1, null,
+ -1, -1, -1, -1,
+ S3ObjectState.COMMITTED));
+ map2.put(2L, new S3Object(
+ 2L, -1, null,
+ -1, -1, -1, -1,
+ S3ObjectState.MARK_DESTROYED));
+ map2.put(4L, new S3Object(
+ 4L, -1, null,
+ -1, -1, -1, -1,
+ S3ObjectState.PREPARED));
+
+ IMAGE2 = new S3ObjectsImage(4L, map2);
}
@Test