Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,26 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.kafka.common.metadata.BrokerWALMetadataRecord;
import org.apache.kafka.common.metadata.RemoveWALObjectRecord;
import org.apache.kafka.common.metadata.WALObjectRecord;
import org.apache.kafka.metadata.stream.S3WALObject;

public class BrokerS3WALMetadataDelta {

private final BrokerS3WALMetadataImage image;
private int brokerId;
private final Map<Long/*objectId*/, S3WALObject> addedS3WALObjects = new HashMap<>();

private final Set<Long/*objectId*/> removedS3WALObjects = new HashSet<>();

public BrokerS3WALMetadataDelta(BrokerS3WALMetadataImage image) {
this.image = image;
this.brokerId = image.getBrokerId();
}

public void replay(BrokerWALMetadataRecord record) {
this.brokerId = record.brokerId();
}

public void replay(WALObjectRecord record) {
Expand All @@ -56,7 +63,7 @@ public BrokerS3WALMetadataImage apply() {
newS3WALObjects.addAll(addedS3WALObjects.values());
// remove all removed WAL objects
newS3WALObjects.removeIf(s3WALObject -> removedS3WALObjects.contains(s3WALObject.objectId()));
return new BrokerS3WALMetadataImage(image.getBrokerId(), newS3WALObjects);
return new BrokerS3WALMetadataImage(this.brokerId, newS3WALObjects);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,19 @@

import java.util.List;
import java.util.Objects;
import org.apache.kafka.common.metadata.BrokerWALMetadataRecord;
import org.apache.kafka.metadata.stream.S3WALObject;
import org.apache.kafka.image.writer.ImageWriter;
import org.apache.kafka.image.writer.ImageWriterOptions;
import org.apache.kafka.server.common.ApiMessageAndVersion;

public class BrokerS3WALMetadataImage {

public static final BrokerS3WALMetadataImage EMPTY = new BrokerS3WALMetadataImage(-1, List.of());
private final Integer brokerId;
private final int brokerId;
private final List<S3WALObject> s3WalObjects;

public BrokerS3WALMetadataImage(Integer brokerId, List<S3WALObject> s3WalObjects) {
public BrokerS3WALMetadataImage(int brokerId, List<S3WALObject> s3WalObjects) {
this.brokerId = brokerId;
this.s3WalObjects = s3WalObjects;
}
Expand All @@ -43,7 +46,7 @@ public boolean equals(Object o) {
return false;
}
BrokerS3WALMetadataImage that = (BrokerS3WALMetadataImage) o;
return Objects.equals(brokerId, that.brokerId) && Objects.equals(s3WalObjects, that.s3WalObjects);
return brokerId == that.brokerId && Objects.equals(s3WalObjects, that.s3WalObjects);
}

@Override
Expand All @@ -52,14 +55,16 @@ public int hashCode() {
}

public void write(ImageWriter writer, ImageWriterOptions options) {
writer.write(new ApiMessageAndVersion(new BrokerWALMetadataRecord()
.setBrokerId(brokerId), (short) 0));
s3WalObjects.forEach(walObject -> writer.write(walObject.toRecord()));
}

public List<S3WALObject> getWalObjects() {
return s3WalObjects;
}

public Integer getBrokerId() {
public int getBrokerId() {
return brokerId;
}
}
49 changes: 39 additions & 10 deletions metadata/src/main/java/org/apache/kafka/image/MetadataDelta.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.kafka.common.metadata.AssignedS3ObjectIdRecord;
import org.apache.kafka.common.metadata.AssignedStreamIdRecord;
import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord;
import org.apache.kafka.common.metadata.BrokerWALMetadataRecord;
import org.apache.kafka.common.metadata.ClientQuotaRecord;
import org.apache.kafka.common.metadata.ConfigRecord;
import org.apache.kafka.common.metadata.FeatureLevelRecord;
Expand All @@ -32,6 +33,7 @@
import org.apache.kafka.common.metadata.RangeRecord;
import org.apache.kafka.common.metadata.RegisterBrokerRecord;
import org.apache.kafka.common.metadata.RemoveAccessControlEntryRecord;
import org.apache.kafka.common.metadata.RemoveBrokerWALMetadataRecord;
import org.apache.kafka.common.metadata.RemoveRangeRecord;
import org.apache.kafka.common.metadata.RemoveS3ObjectRecord;
import org.apache.kafka.common.metadata.RemoveS3StreamObjectRecord;
Expand All @@ -55,7 +57,9 @@
* A change to the broker metadata image.
*/
public final class MetadataDelta {

public static class Builder {

private MetadataImage image = MetadataImage.EMPTY;

public Builder setImage(MetadataImage image) {
Expand Down Expand Up @@ -104,7 +108,9 @@ public FeaturesDelta featuresDelta() {
}

public FeaturesDelta getOrCreateFeaturesDelta() {
if (featuresDelta == null) featuresDelta = new FeaturesDelta(image.features());
if (featuresDelta == null) {
featuresDelta = new FeaturesDelta(image.features());
}
return featuresDelta;
}

Expand All @@ -113,7 +119,9 @@ public ClusterDelta clusterDelta() {
}

public ClusterDelta getOrCreateClusterDelta() {
if (clusterDelta == null) clusterDelta = new ClusterDelta(image.cluster());
if (clusterDelta == null) {
clusterDelta = new ClusterDelta(image.cluster());
}
return clusterDelta;
}

Expand All @@ -122,7 +130,9 @@ public TopicsDelta topicsDelta() {
}

public TopicsDelta getOrCreateTopicsDelta() {
if (topicsDelta == null) topicsDelta = new TopicsDelta(image.topics());
if (topicsDelta == null) {
topicsDelta = new TopicsDelta(image.topics());
}
return topicsDelta;
}

Expand All @@ -131,7 +141,9 @@ public ConfigurationsDelta configsDelta() {
}

public ConfigurationsDelta getOrCreateConfigsDelta() {
if (configsDelta == null) configsDelta = new ConfigurationsDelta(image.configs());
if (configsDelta == null) {
configsDelta = new ConfigurationsDelta(image.configs());
}
return configsDelta;
}

Expand All @@ -140,7 +152,9 @@ public ClientQuotasDelta clientQuotasDelta() {
}

public ClientQuotasDelta getOrCreateClientQuotasDelta() {
if (clientQuotasDelta == null) clientQuotasDelta = new ClientQuotasDelta(image.clientQuotas());
if (clientQuotasDelta == null) {
clientQuotasDelta = new ClientQuotasDelta(image.clientQuotas());
}
return clientQuotasDelta;
}

Expand All @@ -160,7 +174,9 @@ public AclsDelta aclsDelta() {
}

public AclsDelta getOrCreateAclsDelta() {
if (aclsDelta == null) aclsDelta = new AclsDelta(image.acls());
if (aclsDelta == null) {
aclsDelta = new AclsDelta(image.acls());
}
return aclsDelta;
}

Expand Down Expand Up @@ -291,7 +307,13 @@ public void replay(ApiMessage record) {
case ASSIGNED_STREAM_ID_RECORD:
replay((AssignedStreamIdRecord) record);
break;
// Kafka on S3 inject end
case BROKER_WALMETADATA_RECORD:
replay((BrokerWALMetadataRecord) record);
break;
case REMOVE_BROKER_WALMETADATA_RECORD:
replay((RemoveBrokerWALMetadataRecord) record);
break;
// Kafka on S3 inject end
default:
throw new RuntimeException("Unknown metadata record type " + type);
}
Expand Down Expand Up @@ -414,13 +436,21 @@ public void replay(AssignedS3ObjectIdRecord record) {
}

public void replay(AssignedStreamIdRecord record) {
getOrCreateStreamsMetadataDelta().replay(record);
}

public void replay(BrokerWALMetadataRecord record) {
getOrCreateStreamsMetadataDelta().replay(record);
}

public void replay(RemoveBrokerWALMetadataRecord record) {
getOrCreateStreamsMetadataDelta().replay(record);
}

// Kafka on S3 inject end

/**
* Create removal deltas for anything which was in the base image, but which was not
* referenced in the snapshot records we just applied.
* Create removal deltas for anything which was in the base image, but which was not referenced in the snapshot records we just applied.
*/
public void finishSnapshot() {
getOrCreateFeaturesDelta().finishSnapshot();
Expand Down Expand Up @@ -506,7 +536,6 @@ private S3ObjectsImage getNewS3ObjectsMetadataImage() {
image.objectsMetadata() : s3ObjectsDelta.apply();
}


// Kafka on S3 inject end

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public final class S3ObjectsDelta {

public S3ObjectsDelta(S3ObjectsImage image) {
this.image = image;
this.currentAssignedObjectId = image.nextAssignedObjectId() - 1;
}

public S3ObjectsImage image() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,24 @@

package org.apache.kafka.image;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.kafka.common.metadata.RangeRecord;
import org.apache.kafka.common.metadata.RemoveRangeRecord;
import org.apache.kafka.common.metadata.RemoveS3StreamObjectRecord;
import org.apache.kafka.common.metadata.S3StreamObjectRecord;
import org.apache.kafka.common.metadata.S3StreamRecord;
import org.apache.kafka.metadata.stream.RangeMetadata;
import org.apache.kafka.metadata.stream.S3StreamObject;

public class S3StreamMetadataDelta {
private final S3StreamMetadataImage image;

private Long newEpoch;
private long streamId;
private long newStartOffset;
private long newEpoch;

private final Map<Integer/*rangeIndex*/, RangeMetadata> changedRanges = new HashMap<>();
private final Set<Integer/*rangeIndex*/> removedRanges = new HashSet<>();
Expand All @@ -43,6 +44,13 @@ public class S3StreamMetadataDelta {
public S3StreamMetadataDelta(S3StreamMetadataImage image) {
this.image = image;
this.newEpoch = image.getEpoch();
this.streamId = image.getStreamId();
this.newStartOffset = image.getStartOffset();
}
public void replay(S3StreamRecord record) {
this.streamId = record.streamId();
this.newEpoch = record.epoch();
this.newStartOffset = record.startOffset();
}

public void replay(RangeRecord record) {
Expand Down Expand Up @@ -72,15 +80,15 @@ public void replay(RemoveS3StreamObjectRecord record) {
public S3StreamMetadataImage apply() {
Map<Integer, RangeMetadata> newRanges = new HashMap<>(image.getRanges());
// add all new changed ranges
newRanges.putAll(image.getRanges());
newRanges.putAll(changedRanges);
// remove all removed ranges
removedRanges.forEach(newRanges::remove);
List<S3StreamObject> newS3StreamObjects = new ArrayList<>(image.getStreamObjects());
Map<Long, S3StreamObject> newS3StreamObjects = new HashMap<>(image.getStreamObjects());
// add all changed stream-objects
newS3StreamObjects.addAll(changedS3StreamObjects.values());
newS3StreamObjects.putAll(changedS3StreamObjects);
// remove all removed stream-objects
newS3StreamObjects.removeIf(removedS3StreamObjectIds::contains);
return new S3StreamMetadataImage(image.getStreamId(), newEpoch, image.getStartOffset(), newRanges, newS3StreamObjects);
removedS3StreamObjectIds.forEach(newS3StreamObjects::remove);
return new S3StreamMetadataImage(streamId, newEpoch, newStartOffset, newRanges, newS3StreamObjects);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.kafka.image;

import java.util.List;
import java.util.Map;
import java.util.Objects;
import org.apache.kafka.common.metadata.S3StreamRecord;
Expand All @@ -29,24 +28,24 @@
public class S3StreamMetadataImage {

public static final S3StreamMetadataImage EMPTY =
new S3StreamMetadataImage(-1L, -1L, -1L, Map.of(), List.of());
new S3StreamMetadataImage(-1L, -1L, -1L, Map.of(), Map.of());

private final Long streamId;
private final long streamId;

private final Long epoch;
private final long epoch;

private final Long startOffset;
private final long startOffset;

private final Map<Integer/*rangeIndex*/, RangeMetadata> ranges;

private final List<S3StreamObject> streamObjects;
private final Map<Long/*objectId*/, S3StreamObject> streamObjects;

public S3StreamMetadataImage(
Long streamId,
Long epoch,
Long startOffset,
long streamId,
long epoch,
long startOffset,
Map<Integer, RangeMetadata> ranges,
List<S3StreamObject> streamObjects) {
Map<Long, S3StreamObject> streamObjects) {
this.streamId = streamId;
this.epoch = epoch;
this.startOffset = startOffset;
Expand All @@ -60,26 +59,26 @@ public void write(ImageWriter writer, ImageWriterOptions options) {
.setEpoch(epoch)
.setStartOffset(startOffset));
ranges.values().forEach(rangeMetadata -> writer.write(rangeMetadata.toRecord()));
streamObjects.forEach(streamObject -> writer.write(streamObject.toRecord()));
streamObjects.values().forEach(streamObject -> writer.write(streamObject.toRecord()));
}

public Map<Integer, RangeMetadata> getRanges() {
return ranges;
}

public List<S3StreamObject> getStreamObjects() {
public Map<Long, S3StreamObject> getStreamObjects() {
return streamObjects;
}

public Long getEpoch() {
public long getEpoch() {
return epoch;
}

public Long getStartOffset() {
public long getStartOffset() {
return startOffset;
}

public Long getStreamId() {
public long getStreamId() {
return streamId;
}

Expand All @@ -92,8 +91,11 @@ public boolean equals(Object o) {
return false;
}
S3StreamMetadataImage that = (S3StreamMetadataImage) o;
return Objects.equals(streamId, that.streamId) && Objects.equals(epoch, that.epoch) && Objects.equals(startOffset,
that.startOffset) && Objects.equals(ranges, that.ranges) && Objects.equals(streamObjects, that.streamObjects);
return this.streamId == that.streamId &&
this.epoch == that.epoch &&
this.startOffset == that.startOffset &&
this.ranges.equals(that.ranges) &&
this.streamObjects.equals(that.streamObjects);
}

@Override
Expand Down
Loading