Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions checkstyle/import-control.xml
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,7 @@
<allow pkg="org.apache.kafka.snapshot" />
<allow pkg="org.apache.kafka.test" />
<allow pkg="org.apache.kafka.timeline" />
<allow pkg="org.apache.kafka.common.es" />
</subpackage>

<subpackage name="image">
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,20 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.common.es;

import java.util.concurrent.atomic.AtomicBoolean;
Expand Down
16 changes: 16 additions & 0 deletions metadata/src/main/java/org/apache/kafka/image/MetadataDelta.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package org.apache.kafka.image;

import org.apache.kafka.common.metadata.AccessControlEntryRecord;
import org.apache.kafka.common.metadata.AssignedS3ObjectIdRecord;
import org.apache.kafka.common.metadata.AssignedStreamIdRecord;
import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord;
import org.apache.kafka.common.metadata.ClientQuotaRecord;
import org.apache.kafka.common.metadata.ConfigRecord;
Expand Down Expand Up @@ -283,6 +285,12 @@ public void replay(ApiMessage record) {
case REMOVE_S3_OBJECT_RECORD:
replay((RemoveS3ObjectRecord) record);
break;
case ASSIGNED_S3_OBJECT_ID_RECORD:
replay((AssignedS3ObjectIdRecord) record);
break;
case ASSIGNED_STREAM_ID_RECORD:
replay((AssignedStreamIdRecord) record);
break;
// Kafka on S3 inject end
default:
throw new RuntimeException("Unknown metadata record type " + type);
Expand Down Expand Up @@ -401,6 +409,13 @@ public void replay(RemoveS3ObjectRecord record) {
getOrCreateObjectsMetadataDelta().replay(record);
}

public void replay(AssignedS3ObjectIdRecord record) {
getOrCreateObjectsMetadataDelta().replay(record);
}

public void replay(AssignedStreamIdRecord record) {
}

// Kafka on S3 inject end

/**
Expand Down Expand Up @@ -491,6 +506,7 @@ private S3ObjectsImage getNewS3ObjectsMetadataImage() {
image.objectsMetadata() : s3ObjectsDelta.apply();
}


// Kafka on S3 inject end

@Override
Expand Down
28 changes: 18 additions & 10 deletions metadata/src/main/java/org/apache/kafka/image/S3ObjectsDelta.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,24 @@
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.kafka.common.metadata.AssignedS3ObjectIdRecord;
import org.apache.kafka.common.metadata.RemoveS3ObjectRecord;
import org.apache.kafka.common.metadata.S3ObjectRecord;
import org.apache.kafka.metadata.stream.SimplifiedS3Object;
import org.apache.kafka.metadata.stream.S3Object;

/**
* Represents changes to a S3 object in the metadata image.
*/
public final class S3ObjectsDelta {

private final S3ObjectsImage image;

private final Set<SimplifiedS3Object> addedObjects = new HashSet<>();
private final Map<Long/*objectId*/, S3Object> changedObjects = new HashMap<>();

private final Set<Long/*objectId*/> removedObjectIds = new HashSet<>();

private long currentAssignedObjectId;

public S3ObjectsDelta(S3ObjectsImage image) {
this.image = image;
}
Expand All @@ -43,34 +47,38 @@ public S3ObjectsImage image() {
return image;
}

public Set<SimplifiedS3Object> addedObjects() {
return addedObjects;
public Map<Long, S3Object> changedObjects() {
return changedObjects;
}

public Set<Long> removedObjects() {
return removedObjectIds;
}

public void replay(AssignedS3ObjectIdRecord record) {
currentAssignedObjectId = record.assignedS3ObjectId();
}

public void replay(S3ObjectRecord record) {
addedObjects.add(SimplifiedS3Object.of(record));
changedObjects.put(record.objectId(), S3Object.of(record));
// new add or update, so remove from removedObjects
removedObjectIds.remove(record.objectId());
}

public void replay(RemoveS3ObjectRecord record) {
removedObjectIds.add(record.objectId());
// new remove, so remove from addedObjects
addedObjects.removeIf(obj -> obj.objectId() == record.objectId());
changedObjects.remove(record.objectId());
}

public S3ObjectsImage apply() {
// get original objects first
Map<Long, SimplifiedS3Object> newObjectsMetadata = new HashMap<>(image.objectsMetadata());
// put all new added objects
addedObjects.forEach(obj -> newObjectsMetadata.put(obj.objectId(), obj));
Map<Long, S3Object> newObjectsMetadata = new HashMap<>(image.objectsMetadata());
// put all new changed objects
newObjectsMetadata.putAll(changedObjects);
// remove all removed objects
removedObjectIds.forEach(newObjectsMetadata::remove);
return new S3ObjectsImage(newObjectsMetadata);
return new S3ObjectsImage(currentAssignedObjectId, newObjectsMetadata);
}

}
32 changes: 23 additions & 9 deletions metadata/src/main/java/org/apache/kafka/image/S3ObjectsImage.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,31 +20,44 @@
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import org.apache.kafka.common.metadata.AssignedS3ObjectIdRecord;
import org.apache.kafka.image.writer.ImageWriter;
import org.apache.kafka.image.writer.ImageWriterOptions;
import org.apache.kafka.metadata.stream.SimplifiedS3Object;
import org.apache.kafka.metadata.stream.S3Object;
import org.apache.kafka.server.common.ApiMessageAndVersion;

/**
* Represents the S3 objects in the metadata image.
*
* <p>
* This class is thread-safe.
*/
public final class S3ObjectsImage {

public static final S3ObjectsImage EMPTY =
new S3ObjectsImage(Collections.emptyMap());
new S3ObjectsImage(-1, Collections.emptyMap());

private long nextAssignedObjectId;

private final Map<Long/*objectId*/, SimplifiedS3Object> objectsMetadata;
private final Map<Long/*objectId*/, S3Object> objectsMetadata;

public S3ObjectsImage(final Map<Long, SimplifiedS3Object> objectsMetadata) {
public S3ObjectsImage(long assignedObjectId, final Map<Long, S3Object> objectsMetadata) {
this.nextAssignedObjectId = assignedObjectId + 1;
this.objectsMetadata = objectsMetadata;
}

public Map<Long, SimplifiedS3Object> objectsMetadata() {
public Map<Long, S3Object> objectsMetadata() {
return objectsMetadata;
}

public long nextAssignedObjectId() {
return nextAssignedObjectId;
}

public void write(ImageWriter writer, ImageWriterOptions options) {
objectsMetadata.values().stream().map(SimplifiedS3Object::toRecord).forEach(writer::write);
writer.write(
new ApiMessageAndVersion(
new AssignedS3ObjectIdRecord().setAssignedS3ObjectId(nextAssignedObjectId - 1), (short) 0));
objectsMetadata.values().stream().map(S3Object::toRecord).forEach(writer::write);
}

@Override
Expand All @@ -56,11 +69,12 @@ public boolean equals(Object o) {
return false;
}
S3ObjectsImage that = (S3ObjectsImage) o;
return Objects.equals(objectsMetadata, that.objectsMetadata);
return this.nextAssignedObjectId == that.nextAssignedObjectId &&
objectsMetadata.equals(that.objectsMetadata);
}

@Override
public int hashCode() {
return Objects.hash(objectsMetadata);
return Objects.hash(nextAssignedObjectId, objectsMetadata);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,65 +77,22 @@ public S3Object(
this.s3ObjectState = s3ObjectState;
}

public void onApply() {
if (this.s3ObjectState != S3ObjectState.UNINITIALIZED) {
throw new IllegalStateException("Object is not in UNINITIALIZED state");
}
this.s3ObjectState = S3ObjectState.PREPARED;
this.preparedTimeInMs = System.currentTimeMillis();
}

public void onCreate(S3ObjectCommitContext createContext) {
// TODO: decide fetch object metadata from S3 or let broker send it to controller
if (this.s3ObjectState != S3ObjectState.PREPARED) {
throw new IllegalStateException("Object is not in APPLIED state");
}
this.s3ObjectState = S3ObjectState.COMMITTED;
this.committedTimeInMs = createContext.committedTimeInMs;
this.objectSize = createContext.objectSize;
this.objectKey = createContext.objectAddress;
}

public void onMarkDestroy() {
if (this.s3ObjectState != S3ObjectState.COMMITTED) {
throw new IllegalStateException("Object is not in CREATED state");
}
this.s3ObjectState = S3ObjectState.MARK_DESTROYED;
}

public void onDestroy() {
if (this.s3ObjectState != S3ObjectState.COMMITTED) {
throw new IllegalStateException("Object is not in CREATED state");
}
// TODO: trigger destroy

}

public ApiMessageAndVersion toRecord() {
return new ApiMessageAndVersion(new S3ObjectRecord()
.setObjectId(objectId)
.setObjectSize(objectSize)
.setObjectState((byte) s3ObjectState.ordinal())
.setObjectState(s3ObjectState.toByte())
.setPreparedTimeInMs(preparedTimeInMs)
.setExpiredTimeInMs(expiredTimeInMs)
.setCommittedTimeInMs(committedTimeInMs)
.setDestroyedTimeInMs(destroyedTimeInMs), (short) 0);
}

static public class S3ObjectCommitContext {

private final long committedTimeInMs;
private final long objectSize;
private final String objectAddress;

public S3ObjectCommitContext(
final long committedTimeInMs,
final long objectSize,
final String objectAddress) {
this.committedTimeInMs = committedTimeInMs;
this.objectSize = objectSize;
this.objectAddress = objectAddress;
}
public static S3Object of(S3ObjectRecord record) {
return new S3Object(
record.objectId(), record.objectSize(), null,
record.preparedTimeInMs(), record.expiredTimeInMs(), record.committedTimeInMs(), record.destroyedTimeInMs(),
S3ObjectState.fromByte(record.objectState()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ public enum S3ObjectState {
MARK_DESTROYED,
DESTROYED;

public byte toByte() {
return (byte) ordinal();
}

public static S3ObjectState fromByte(Byte b) {
int ordinal = b.intValue();
if (ordinal < 0 || ordinal >= values().length) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

{
"apiKey": 512,
"type": "metadata",
"name": "AssignedS3ObjectIdRecord",
"validVersions": "0",
"flexibleVersions": "0+",
"fields": [
{
"name": "AssignedS3ObjectId",
"type": "int64",
"versions": "0+",
"about": "The last assigned S3 object's id"
}
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

{
"apiKey": 511,
"type": "metadata",
"name": "AssignedStreamIdRecord",
"validVersions": "0",
"flexibleVersions": "0+",
"fields": [
{
"name": "AssignedStreamId",
"type": "int64",
"versions": "0+",
"about": "The last assigned stream's id"
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
// limitations under the License.

{
"apiKey": 25,
"apiKey": 503,
"type": "metadata",
"name": "RangeRecord",
"validVersions": "0",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
// limitations under the License.

{
"apiKey": 26,
"apiKey": 504,
"type": "metadata",
"name": "RemoveRangeRecord",
"validVersions": "0",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
// limitations under the License.

{
"apiKey": 32,
"apiKey": 510,
"type": "metadata",
"name": "RemoveS3ObjectRecord",
"validVersions": "0",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
// limitations under the License.

{
"apiKey": 28,
"apiKey": 506,
"type": "metadata",
"name": "RemoveS3StreamObjectRecord",
"validVersions": "0",
Expand Down
Loading