diff --git a/metadata/src/main/java/org/apache/kafka/image/BrokerS3WALMetadataDelta.java b/metadata/src/main/java/org/apache/kafka/image/BrokerS3WALMetadataDelta.java index 1c9c04f3f2..2853a68986 100644 --- a/metadata/src/main/java/org/apache/kafka/image/BrokerS3WALMetadataDelta.java +++ b/metadata/src/main/java/org/apache/kafka/image/BrokerS3WALMetadataDelta.java @@ -23,6 +23,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import org.apache.kafka.common.metadata.BrokerWALMetadataRecord; import org.apache.kafka.common.metadata.RemoveWALObjectRecord; import org.apache.kafka.common.metadata.WALObjectRecord; import org.apache.kafka.metadata.stream.S3WALObject; @@ -30,12 +31,18 @@ public class BrokerS3WALMetadataDelta { private final BrokerS3WALMetadataImage image; + private int brokerId; private final Map addedS3WALObjects = new HashMap<>(); private final Set removedS3WALObjects = new HashSet<>(); public BrokerS3WALMetadataDelta(BrokerS3WALMetadataImage image) { this.image = image; + this.brokerId = image.getBrokerId(); + } + + public void replay(BrokerWALMetadataRecord record) { + this.brokerId = record.brokerId(); } public void replay(WALObjectRecord record) { @@ -56,7 +63,7 @@ public BrokerS3WALMetadataImage apply() { newS3WALObjects.addAll(addedS3WALObjects.values()); // remove all removed WAL objects newS3WALObjects.removeIf(s3WALObject -> removedS3WALObjects.contains(s3WALObject.objectId())); - return new BrokerS3WALMetadataImage(image.getBrokerId(), newS3WALObjects); + return new BrokerS3WALMetadataImage(this.brokerId, newS3WALObjects); } } diff --git a/metadata/src/main/java/org/apache/kafka/image/BrokerS3WALMetadataImage.java b/metadata/src/main/java/org/apache/kafka/image/BrokerS3WALMetadataImage.java index e99a01b22b..e5bf74e658 100644 --- a/metadata/src/main/java/org/apache/kafka/image/BrokerS3WALMetadataImage.java +++ b/metadata/src/main/java/org/apache/kafka/image/BrokerS3WALMetadataImage.java @@ -20,16 +20,19 @@ import java.util.List; import java.util.Objects; +import org.apache.kafka.common.metadata.BrokerWALMetadataRecord; import org.apache.kafka.metadata.stream.S3WALObject; import org.apache.kafka.image.writer.ImageWriter; import org.apache.kafka.image.writer.ImageWriterOptions; +import org.apache.kafka.server.common.ApiMessageAndVersion; public class BrokerS3WALMetadataImage { + public static final BrokerS3WALMetadataImage EMPTY = new BrokerS3WALMetadataImage(-1, List.of()); - private final Integer brokerId; + private final int brokerId; private final List s3WalObjects; - public BrokerS3WALMetadataImage(Integer brokerId, List s3WalObjects) { + public BrokerS3WALMetadataImage(int brokerId, List s3WalObjects) { this.brokerId = brokerId; this.s3WalObjects = s3WalObjects; } @@ -43,7 +46,7 @@ public boolean equals(Object o) { return false; } BrokerS3WALMetadataImage that = (BrokerS3WALMetadataImage) o; - return Objects.equals(brokerId, that.brokerId) && Objects.equals(s3WalObjects, that.s3WalObjects); + return brokerId == that.brokerId && Objects.equals(s3WalObjects, that.s3WalObjects); } @Override @@ -52,6 +55,8 @@ public int hashCode() { } public void write(ImageWriter writer, ImageWriterOptions options) { + writer.write(new ApiMessageAndVersion(new BrokerWALMetadataRecord() + .setBrokerId(brokerId), (short) 0)); s3WalObjects.forEach(walObject -> writer.write(walObject.toRecord())); } @@ -59,7 +64,7 @@ public List getWalObjects() { return s3WalObjects; } - public Integer getBrokerId() { + public int getBrokerId() { return brokerId; } } diff --git a/metadata/src/main/java/org/apache/kafka/image/MetadataDelta.java b/metadata/src/main/java/org/apache/kafka/image/MetadataDelta.java index 1093272b21..81f61eeaed 100644 --- a/metadata/src/main/java/org/apache/kafka/image/MetadataDelta.java +++ b/metadata/src/main/java/org/apache/kafka/image/MetadataDelta.java @@ -21,6 +21,7 @@ import org.apache.kafka.common.metadata.AssignedS3ObjectIdRecord; import org.apache.kafka.common.metadata.AssignedStreamIdRecord; import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord; +import org.apache.kafka.common.metadata.BrokerWALMetadataRecord; import org.apache.kafka.common.metadata.ClientQuotaRecord; import org.apache.kafka.common.metadata.ConfigRecord; import org.apache.kafka.common.metadata.FeatureLevelRecord; @@ -32,6 +33,7 @@ import org.apache.kafka.common.metadata.RangeRecord; import org.apache.kafka.common.metadata.RegisterBrokerRecord; import org.apache.kafka.common.metadata.RemoveAccessControlEntryRecord; +import org.apache.kafka.common.metadata.RemoveBrokerWALMetadataRecord; import org.apache.kafka.common.metadata.RemoveRangeRecord; import org.apache.kafka.common.metadata.RemoveS3ObjectRecord; import org.apache.kafka.common.metadata.RemoveS3StreamObjectRecord; @@ -55,7 +57,9 @@ * A change to the broker metadata image. */ public final class MetadataDelta { + public static class Builder { + private MetadataImage image = MetadataImage.EMPTY; public Builder setImage(MetadataImage image) { @@ -104,7 +108,9 @@ public FeaturesDelta featuresDelta() { } public FeaturesDelta getOrCreateFeaturesDelta() { - if (featuresDelta == null) featuresDelta = new FeaturesDelta(image.features()); + if (featuresDelta == null) { + featuresDelta = new FeaturesDelta(image.features()); + } return featuresDelta; } @@ -113,7 +119,9 @@ public ClusterDelta clusterDelta() { } public ClusterDelta getOrCreateClusterDelta() { - if (clusterDelta == null) clusterDelta = new ClusterDelta(image.cluster()); + if (clusterDelta == null) { + clusterDelta = new ClusterDelta(image.cluster()); + } return clusterDelta; } @@ -122,7 +130,9 @@ public TopicsDelta topicsDelta() { } public TopicsDelta getOrCreateTopicsDelta() { - if (topicsDelta == null) topicsDelta = new TopicsDelta(image.topics()); + if (topicsDelta == null) { + topicsDelta = new TopicsDelta(image.topics()); + } return topicsDelta; } @@ -131,7 +141,9 @@ public ConfigurationsDelta configsDelta() { } public ConfigurationsDelta getOrCreateConfigsDelta() { - if (configsDelta == null) configsDelta = new ConfigurationsDelta(image.configs()); + if (configsDelta == null) { + configsDelta = new ConfigurationsDelta(image.configs()); + } return configsDelta; } @@ -140,7 +152,9 @@ public ClientQuotasDelta clientQuotasDelta() { } public ClientQuotasDelta getOrCreateClientQuotasDelta() { - if (clientQuotasDelta == null) clientQuotasDelta = new ClientQuotasDelta(image.clientQuotas()); + if (clientQuotasDelta == null) { + clientQuotasDelta = new ClientQuotasDelta(image.clientQuotas()); + } return clientQuotasDelta; } @@ -160,7 +174,9 @@ public AclsDelta aclsDelta() { } public AclsDelta getOrCreateAclsDelta() { - if (aclsDelta == null) aclsDelta = new AclsDelta(image.acls()); + if (aclsDelta == null) { + aclsDelta = new AclsDelta(image.acls()); + } return aclsDelta; } @@ -291,7 +307,13 @@ public void replay(ApiMessage record) { case ASSIGNED_STREAM_ID_RECORD: replay((AssignedStreamIdRecord) record); break; - // Kafka on S3 inject end + case BROKER_WALMETADATA_RECORD: + replay((BrokerWALMetadataRecord) record); + break; + case REMOVE_BROKER_WALMETADATA_RECORD: + replay((RemoveBrokerWALMetadataRecord) record); + break; + // Kafka on S3 inject end default: throw new RuntimeException("Unknown metadata record type " + type); } @@ -414,13 +436,21 @@ public void replay(AssignedS3ObjectIdRecord record) { } public void replay(AssignedStreamIdRecord record) { + getOrCreateStreamsMetadataDelta().replay(record); + } + + public void replay(BrokerWALMetadataRecord record) { + getOrCreateStreamsMetadataDelta().replay(record); + } + + public void replay(RemoveBrokerWALMetadataRecord record) { + getOrCreateStreamsMetadataDelta().replay(record); } // Kafka on S3 inject end /** - * Create removal deltas for anything which was in the base image, but which was not - * referenced in the snapshot records we just applied. + * Create removal deltas for anything which was in the base image, but which was not referenced in the snapshot records we just applied. */ public void finishSnapshot() { getOrCreateFeaturesDelta().finishSnapshot(); @@ -506,7 +536,6 @@ private S3ObjectsImage getNewS3ObjectsMetadataImage() { image.objectsMetadata() : s3ObjectsDelta.apply(); } - // Kafka on S3 inject end @Override diff --git a/metadata/src/main/java/org/apache/kafka/image/S3ObjectsDelta.java b/metadata/src/main/java/org/apache/kafka/image/S3ObjectsDelta.java index 1408c7ae34..5673d0b2a8 100644 --- a/metadata/src/main/java/org/apache/kafka/image/S3ObjectsDelta.java +++ b/metadata/src/main/java/org/apache/kafka/image/S3ObjectsDelta.java @@ -41,6 +41,7 @@ public final class S3ObjectsDelta { public S3ObjectsDelta(S3ObjectsImage image) { this.image = image; + this.currentAssignedObjectId = image.nextAssignedObjectId() - 1; } public S3ObjectsImage image() { diff --git a/metadata/src/main/java/org/apache/kafka/image/S3StreamMetadataDelta.java b/metadata/src/main/java/org/apache/kafka/image/S3StreamMetadataDelta.java index 6dad5f3b2b..c823ba3f6d 100644 --- a/metadata/src/main/java/org/apache/kafka/image/S3StreamMetadataDelta.java +++ b/metadata/src/main/java/org/apache/kafka/image/S3StreamMetadataDelta.java @@ -17,23 +17,24 @@ package org.apache.kafka.image; -import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; -import java.util.List; import java.util.Map; import java.util.Set; import org.apache.kafka.common.metadata.RangeRecord; import org.apache.kafka.common.metadata.RemoveRangeRecord; import org.apache.kafka.common.metadata.RemoveS3StreamObjectRecord; import org.apache.kafka.common.metadata.S3StreamObjectRecord; +import org.apache.kafka.common.metadata.S3StreamRecord; import org.apache.kafka.metadata.stream.RangeMetadata; import org.apache.kafka.metadata.stream.S3StreamObject; public class S3StreamMetadataDelta { private final S3StreamMetadataImage image; - private Long newEpoch; + private long streamId; + private long newStartOffset; + private long newEpoch; private final Map changedRanges = new HashMap<>(); private final Set removedRanges = new HashSet<>(); @@ -43,6 +44,13 @@ public class S3StreamMetadataDelta { public S3StreamMetadataDelta(S3StreamMetadataImage image) { this.image = image; this.newEpoch = image.getEpoch(); + this.streamId = image.getStreamId(); + this.newStartOffset = image.getStartOffset(); + } + public void replay(S3StreamRecord record) { + this.streamId = record.streamId(); + this.newEpoch = record.epoch(); + this.newStartOffset = record.startOffset(); } public void replay(RangeRecord record) { @@ -72,15 +80,15 @@ public void replay(RemoveS3StreamObjectRecord record) { public S3StreamMetadataImage apply() { Map newRanges = new HashMap<>(image.getRanges()); // add all new changed ranges - newRanges.putAll(image.getRanges()); + newRanges.putAll(changedRanges); // remove all removed ranges removedRanges.forEach(newRanges::remove); - List newS3StreamObjects = new ArrayList<>(image.getStreamObjects()); + Map newS3StreamObjects = new HashMap<>(image.getStreamObjects()); // add all changed stream-objects - newS3StreamObjects.addAll(changedS3StreamObjects.values()); + newS3StreamObjects.putAll(changedS3StreamObjects); // remove all removed stream-objects - newS3StreamObjects.removeIf(removedS3StreamObjectIds::contains); - return new S3StreamMetadataImage(image.getStreamId(), newEpoch, image.getStartOffset(), newRanges, newS3StreamObjects); + removedS3StreamObjectIds.forEach(newS3StreamObjects::remove); + return new S3StreamMetadataImage(streamId, newEpoch, newStartOffset, newRanges, newS3StreamObjects); } } diff --git a/metadata/src/main/java/org/apache/kafka/image/S3StreamMetadataImage.java b/metadata/src/main/java/org/apache/kafka/image/S3StreamMetadataImage.java index 0ce386d53a..2eb3521615 100644 --- a/metadata/src/main/java/org/apache/kafka/image/S3StreamMetadataImage.java +++ b/metadata/src/main/java/org/apache/kafka/image/S3StreamMetadataImage.java @@ -17,7 +17,6 @@ package org.apache.kafka.image; -import java.util.List; import java.util.Map; import java.util.Objects; import org.apache.kafka.common.metadata.S3StreamRecord; @@ -29,24 +28,24 @@ public class S3StreamMetadataImage { public static final S3StreamMetadataImage EMPTY = - new S3StreamMetadataImage(-1L, -1L, -1L, Map.of(), List.of()); + new S3StreamMetadataImage(-1L, -1L, -1L, Map.of(), Map.of()); - private final Long streamId; + private final long streamId; - private final Long epoch; + private final long epoch; - private final Long startOffset; + private final long startOffset; private final Map ranges; - private final List streamObjects; + private final Map streamObjects; public S3StreamMetadataImage( - Long streamId, - Long epoch, - Long startOffset, + long streamId, + long epoch, + long startOffset, Map ranges, - List streamObjects) { + Map streamObjects) { this.streamId = streamId; this.epoch = epoch; this.startOffset = startOffset; @@ -60,26 +59,26 @@ public void write(ImageWriter writer, ImageWriterOptions options) { .setEpoch(epoch) .setStartOffset(startOffset)); ranges.values().forEach(rangeMetadata -> writer.write(rangeMetadata.toRecord())); - streamObjects.forEach(streamObject -> writer.write(streamObject.toRecord())); + streamObjects.values().forEach(streamObject -> writer.write(streamObject.toRecord())); } public Map getRanges() { return ranges; } - public List getStreamObjects() { + public Map getStreamObjects() { return streamObjects; } - public Long getEpoch() { + public long getEpoch() { return epoch; } - public Long getStartOffset() { + public long getStartOffset() { return startOffset; } - public Long getStreamId() { + public long getStreamId() { return streamId; } @@ -92,8 +91,11 @@ public boolean equals(Object o) { return false; } S3StreamMetadataImage that = (S3StreamMetadataImage) o; - return Objects.equals(streamId, that.streamId) && Objects.equals(epoch, that.epoch) && Objects.equals(startOffset, - that.startOffset) && Objects.equals(ranges, that.ranges) && Objects.equals(streamObjects, that.streamObjects); + return this.streamId == that.streamId && + this.epoch == that.epoch && + this.startOffset == that.startOffset && + this.ranges.equals(that.ranges) && + this.streamObjects.equals(that.streamObjects); } @Override diff --git a/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataDelta.java b/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataDelta.java index f4ac0f58d6..bcf5a206c6 100644 --- a/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataDelta.java +++ b/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataDelta.java @@ -17,12 +17,14 @@ package org.apache.kafka.image; -import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; +import org.apache.kafka.common.metadata.AssignedStreamIdRecord; +import org.apache.kafka.common.metadata.BrokerWALMetadataRecord; import org.apache.kafka.common.metadata.RangeRecord; +import org.apache.kafka.common.metadata.RemoveBrokerWALMetadataRecord; import org.apache.kafka.common.metadata.RemoveRangeRecord; import org.apache.kafka.common.metadata.RemoveS3StreamObjectRecord; import org.apache.kafka.common.metadata.RemoveS3StreamRecord; @@ -35,6 +37,8 @@ public final class S3StreamsMetadataDelta { private final S3StreamsMetadataImage image; + private long currentAssignedStreamId; + private final Map changedStreams = new HashMap<>(); private final Map changedBrokers = new HashMap<>(); @@ -44,32 +48,41 @@ public final class S3StreamsMetadataDelta { // We don't use pair of specify BrokerCreateRecord and BrokerRemoveRecord to create or remove brokers, and // we create BrokerStreamMetadataImage when we create the first WALObjectRecord for a broker, // so we should decide when to recycle the broker's memory data structure - private final Set deletedBrokers = new HashSet<>(); + private final Set deletedBrokers = new HashSet<>(); public S3StreamsMetadataDelta(S3StreamsMetadataImage image) { this.image = image; + this.currentAssignedStreamId = image.nextAssignedStreamId() - 1; + } + + public void replay(AssignedStreamIdRecord record) { + this.currentAssignedStreamId = record.assignedStreamId(); } public void replay(S3StreamRecord record) { - S3StreamMetadataDelta delta; - if (!image.getStreamsMetadata().containsKey(record.streamId())) { - // create a new StreamMetadata with empty ranges and streams if not exist - delta = new S3StreamMetadataDelta( - new S3StreamMetadataImage(record.streamId(), record.epoch(), record.startOffset(), Collections.emptyMap(), Collections.emptyList())); - } else { - // update the epoch if exist - S3StreamMetadataImage s3StreamMetadataImage = image.getStreamsMetadata().get(record.streamId()); - delta = new S3StreamMetadataDelta( - new S3StreamMetadataImage(record.streamId(), record.epoch(), record.startOffset(), s3StreamMetadataImage.getRanges(), - s3StreamMetadataImage.getStreamObjects())); + getOrCreateStreamMetadataDelta(record.streamId()).replay(record); + if (deletedStreams.contains(record.streamId())) { + deletedStreams.remove(record.streamId()); } - // add the delta to the changedStreams - changedStreams.put(record.streamId(), delta); } public void replay(RemoveS3StreamRecord record) { // add the streamId to the deletedStreams deletedStreams.add(record.streamId()); + changedStreams.remove(record.streamId()); + } + + public void replay(BrokerWALMetadataRecord record) { + getOrCreateBrokerStreamMetadataDelta(record.brokerId()).replay(record); + if (deletedBrokers.contains(record.brokerId())) { + deletedBrokers.remove(record.brokerId()); + } + } + + public void replay(RemoveBrokerWALMetadataRecord record) { + // add the brokerId to the deletedBrokers + deletedBrokers.add(record.brokerId()); + changedBrokers.remove(record.brokerId()); } public void replay(RangeRecord record) { @@ -99,7 +112,7 @@ public void replay(RemoveWALObjectRecord record) { private S3StreamMetadataDelta getOrCreateStreamMetadataDelta(Long streamId) { S3StreamMetadataDelta delta = changedStreams.get(streamId); if (delta == null) { - delta = new S3StreamMetadataDelta(image.getStreamsMetadata().get(streamId)); + delta = new S3StreamMetadataDelta(image.streamsMetadata().getOrDefault(streamId, S3StreamMetadataImage.EMPTY)); changedStreams.put(streamId, delta); } return delta; @@ -109,59 +122,34 @@ private BrokerS3WALMetadataDelta getOrCreateBrokerStreamMetadataDelta(Integer br BrokerS3WALMetadataDelta delta = changedBrokers.get(brokerId); if (delta == null) { delta = new BrokerS3WALMetadataDelta( - image.getBrokerWALMetadata(). - getOrDefault(brokerId, new BrokerS3WALMetadataImage(brokerId, Collections.emptyList()))); + image.brokerWALMetadata(). + getOrDefault(brokerId, BrokerS3WALMetadataImage.EMPTY)); changedBrokers.put(brokerId, delta); } return delta; } S3StreamsMetadataImage apply() { - Map newStreams = new HashMap<>(image.getStreamsMetadata().size()); - Map newBrokerStreams = new HashMap<>(image.getBrokerWALMetadata().size()); + Map newStreams = new HashMap<>(image.streamsMetadata()); + Map newBrokerStreams = new HashMap<>(image.brokerWALMetadata()); + // apply the delta changes of old streams since the last image - image.getStreamsMetadata().forEach((streamId, streamMetadataImage) -> { - S3StreamMetadataDelta delta = changedStreams.get(streamId); - if (delta == null) { - // no change, check if deleted - if (!deletedStreams.contains(streamId)) { - newStreams.put(streamId, streamMetadataImage); - } - } else { - // changed, apply the delta - S3StreamMetadataImage newS3StreamMetadataImage = delta.apply(); - newStreams.put(streamId, newS3StreamMetadataImage); - } + this.changedStreams.forEach((streamId, delta) -> { + S3StreamMetadataImage newS3StreamMetadataImage = delta.apply(); + newStreams.put(streamId, newS3StreamMetadataImage); }); - // apply the new created streams - changedStreams.entrySet().stream().filter(entry -> !newStreams.containsKey(entry.getKey())) - .forEach(entry -> { - S3StreamMetadataImage newS3StreamMetadataImage = entry.getValue().apply(); - newStreams.put(entry.getKey(), newS3StreamMetadataImage); - }); + // remove the deleted streams + deletedStreams.forEach(newStreams::remove); // apply the delta changes of old brokers since the last image - image.getBrokerWALMetadata().forEach((brokerId, brokerStreamMetadataImage) -> { - BrokerS3WALMetadataDelta delta = changedBrokers.get(brokerId); - if (delta == null) { - // no change, check if deleted - if (!deletedBrokers.contains(brokerId)) { - newBrokerStreams.put(brokerId, brokerStreamMetadataImage); - } - } else { - // changed, apply the delta - BrokerS3WALMetadataImage newBrokerS3WALMetadataImage = delta.apply(); - newBrokerStreams.put(brokerId, newBrokerS3WALMetadataImage); - } + this.changedBrokers.forEach((brokerId, delta) -> { + BrokerS3WALMetadataImage newBrokerS3WALMetadataImage = delta.apply(); + newBrokerStreams.put(brokerId, newBrokerS3WALMetadataImage); }); - // apply the new created streams - changedBrokers.entrySet().stream().filter(entry -> !newBrokerStreams.containsKey(entry.getKey())) - .forEach(entry -> { - BrokerS3WALMetadataImage newBrokerS3WALMetadataImage = entry.getValue().apply(); - newBrokerStreams.put(entry.getKey(), newBrokerS3WALMetadataImage); - }); - - return new S3StreamsMetadataImage(newStreams, newBrokerStreams); + // remove the deleted brokers + deletedBrokers.forEach(newBrokerStreams::remove); + + return new S3StreamsMetadataImage(currentAssignedStreamId, newStreams, newBrokerStreams); } } diff --git a/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataImage.java b/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataImage.java index 727f4ee3e5..fbfc458267 100644 --- a/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataImage.java +++ b/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataImage.java @@ -20,21 +20,27 @@ import java.util.Collections; import java.util.Map; import java.util.Objects; +import org.apache.kafka.common.metadata.AssignedStreamIdRecord; import org.apache.kafka.image.writer.ImageWriter; import org.apache.kafka.image.writer.ImageWriterOptions; +import org.apache.kafka.server.common.ApiMessageAndVersion; public final class S3StreamsMetadataImage { public static final S3StreamsMetadataImage EMPTY = - new S3StreamsMetadataImage(Collections.emptyMap(), Collections.emptyMap()); + new S3StreamsMetadataImage(-1, Collections.emptyMap(), Collections.emptyMap()); + + private long nextAssignedStreamId; private final Map streamsMetadata; private final Map brokerWALMetadata; public S3StreamsMetadataImage( + long assignedStreamId, Map streamsMetadata, Map brokerWALMetadata) { + this.nextAssignedStreamId = assignedStreamId + 1; this.streamsMetadata = streamsMetadata; this.brokerWALMetadata = brokerWALMetadata; } @@ -45,28 +51,41 @@ boolean isEmpty() { } public void write(ImageWriter writer, ImageWriterOptions options) { + writer.write( + new ApiMessageAndVersion( + new AssignedStreamIdRecord().setAssignedStreamId(nextAssignedStreamId - 1), (short) 0)); streamsMetadata.values().forEach(image -> image.write(writer, options)); brokerWALMetadata.values().forEach(image -> image.write(writer, options)); } @Override public boolean equals(Object obj) { - if (!(obj instanceof S3StreamsMetadataImage)) return false; + if (this == obj) { + return true; + } + if (obj == null || getClass() != obj.getClass()) { + return false; + } S3StreamsMetadataImage other = (S3StreamsMetadataImage) obj; - return this.streamsMetadata.equals(other.streamsMetadata) + return this.nextAssignedStreamId == other.nextAssignedStreamId + && this.streamsMetadata.equals(other.streamsMetadata) && this.brokerWALMetadata.equals(other.brokerWALMetadata); } @Override public int hashCode() { - return Objects.hash(streamsMetadata, brokerWALMetadata); + return Objects.hash(nextAssignedStreamId, streamsMetadata, brokerWALMetadata); } - public Map getBrokerWALMetadata() { + public Map brokerWALMetadata() { return brokerWALMetadata; } - public Map getStreamsMetadata() { + public Map streamsMetadata() { return streamsMetadata; } + + public long nextAssignedStreamId() { + return nextAssignedStreamId; + } } diff --git a/metadata/src/main/java/org/apache/kafka/metadata/stream/RangeMetadata.java b/metadata/src/main/java/org/apache/kafka/metadata/stream/RangeMetadata.java index 067719fc33..dcfadedf6e 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/stream/RangeMetadata.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/stream/RangeMetadata.java @@ -17,6 +17,7 @@ package org.apache.kafka.metadata.stream; +import java.util.Objects; import org.apache.kafka.common.metadata.RangeRecord; import org.apache.kafka.server.common.ApiMessageAndVersion; @@ -85,4 +86,22 @@ public static RangeMetadata of(RangeRecord record) { ); return rangeMetadata; } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + RangeMetadata that = (RangeMetadata) o; + return streamId == that.streamId && epoch == that.epoch && rangeIndex == that.rangeIndex && startOffset == that.startOffset + && endOffset == that.endOffset && brokerId == that.brokerId; + } + + @Override + public int hashCode() { + return Objects.hash(streamId, epoch, rangeIndex, startOffset, endOffset, brokerId); + } } diff --git a/metadata/src/main/java/org/apache/kafka/metadata/stream/S3Object.java b/metadata/src/main/java/org/apache/kafka/metadata/stream/S3Object.java index 97c3ae1ce1..9445b97de3 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/stream/S3Object.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/stream/S3Object.java @@ -109,7 +109,7 @@ public boolean equals(Object o) { return false; } S3Object s3Object = (S3Object) o; - return Objects.equals(objectId, s3Object.objectId); + return objectId == s3Object.objectId; } @Override diff --git a/metadata/src/main/resources/common/metadata/BrokerWALMetadataRecord.json b/metadata/src/main/resources/common/metadata/BrokerWALMetadataRecord.json new file mode 100644 index 0000000000..3171199b01 --- /dev/null +++ b/metadata/src/main/resources/common/metadata/BrokerWALMetadataRecord.json @@ -0,0 +1,30 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 513, + "type": "metadata", + "name": "BrokerWALMetadataRecord", + "validVersions": "0", + "flexibleVersions": "0+", + "fields": [ + { + "name": "BrokerId", + "type": "int32", + "versions": "0+", + "about": "The Broker ID" + } + ] +} \ No newline at end of file diff --git a/metadata/src/main/resources/common/metadata/RemoveBrokerWALMetadataRecord.json b/metadata/src/main/resources/common/metadata/RemoveBrokerWALMetadataRecord.json new file mode 100644 index 0000000000..5b01ca0a0f --- /dev/null +++ b/metadata/src/main/resources/common/metadata/RemoveBrokerWALMetadataRecord.json @@ -0,0 +1,30 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 514, + "type": "metadata", + "name": "RemoveBrokerWALMetadataRecord", + "validVersions": "0", + "flexibleVersions": "0+", + "fields": [ + { + "name": "BrokerId", + "type": "int32", + "versions": "0+", + "about": "The ID of the broker to be removed" + } + ] +} diff --git a/metadata/src/test/java/org/apache/kafka/image/BrokerS3WALMetadataTest.java b/metadata/src/test/java/org/apache/kafka/image/BrokerS3WALMetadataImageTest.java similarity index 91% rename from metadata/src/test/java/org/apache/kafka/image/BrokerS3WALMetadataTest.java rename to metadata/src/test/java/org/apache/kafka/image/BrokerS3WALMetadataImageTest.java index f373224ed8..af5d2016f9 100644 --- a/metadata/src/test/java/org/apache/kafka/image/BrokerS3WALMetadataTest.java +++ b/metadata/src/test/java/org/apache/kafka/image/BrokerS3WALMetadataImageTest.java @@ -22,6 +22,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import org.apache.kafka.common.metadata.BrokerWALMetadataRecord; import org.apache.kafka.common.metadata.RemoveWALObjectRecord; import org.apache.kafka.common.metadata.WALObjectRecord; import org.apache.kafka.image.writer.ImageWriterOptions; @@ -30,11 +31,13 @@ import org.apache.kafka.metadata.stream.S3ObjectStreamIndex; import org.apache.kafka.metadata.stream.S3WALObject; import org.apache.kafka.server.common.ApiMessageAndVersion; +import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; @Timeout(value = 40) -public class BrokerS3WALMetadataTest { +@Tag("S3Unit") +public class BrokerS3WALMetadataImageTest { private static final int BROKER0 = 0; @@ -48,6 +51,8 @@ public void testS3WALObjects() { List delta0Records = new ArrayList<>(); BrokerS3WALMetadataDelta delta0 = new BrokerS3WALMetadataDelta(image0); // 1. create WALObject0 and WALObject1 + delta0Records.add(new ApiMessageAndVersion(new BrokerWALMetadataRecord() + .setBrokerId(BROKER0), (short) 0)); delta0Records.add(new ApiMessageAndVersion(new WALObjectRecord() .setObjectId(0L) .setBrokerId(BROKER0) @@ -97,9 +102,9 @@ private void testToImageAndBack(BrokerS3WALMetadataImage image) { RecordListWriter writer = new RecordListWriter(); ImageWriterOptions options = new ImageWriterOptions.Builder().build(); image.write(writer, options); - S3ObjectsDelta delta = new S3ObjectsDelta(S3ObjectsImage.EMPTY); + BrokerS3WALMetadataDelta delta = new BrokerS3WALMetadataDelta(BrokerS3WALMetadataImage.EMPTY); RecordTestUtils.replayAll(delta, writer.records()); - S3ObjectsImage newImage = delta.apply(); + BrokerS3WALMetadataImage newImage = delta.apply(); assertEquals(image, newImage); } diff --git a/metadata/src/test/java/org/apache/kafka/image/S3StreamMetadataImageTest.java b/metadata/src/test/java/org/apache/kafka/image/S3StreamMetadataImageTest.java index 6cf808a28a..faa9d0e7db 100644 --- a/metadata/src/test/java/org/apache/kafka/image/S3StreamMetadataImageTest.java +++ b/metadata/src/test/java/org/apache/kafka/image/S3StreamMetadataImageTest.java @@ -33,10 +33,12 @@ import org.apache.kafka.metadata.stream.RangeMetadata; import org.apache.kafka.metadata.stream.S3StreamObject; import org.apache.kafka.server.common.ApiMessageAndVersion; +import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; @Timeout(value = 40) +@Tag("S3Unit") public class S3StreamMetadataImageTest { private static final long STREAM0 = 0L; @@ -58,7 +60,7 @@ public void testRanges() { RecordTestUtils.replayAll(delta0, delta0Records); // verify delta and check image's write S3StreamMetadataImage image1 = new S3StreamMetadataImage( - STREAM0, 0L, 0L, Map.of(), List.of()); + STREAM0, 0L, 0L, Map.of(), Map.of()); assertEquals(image1, delta0.apply()); testToImageAndBack(image1); @@ -79,7 +81,9 @@ public void testRanges() { RecordTestUtils.replayAll(delta1, delta1Records); // verify delta and check image's write S3StreamMetadataImage image2 = new S3StreamMetadataImage( - STREAM0, 1L, 0L, Map.of(0, new RangeMetadata(STREAM0, 1L, 0, 0L, -1L, BROKER0)), List.of()); + STREAM0, 1L, 0L, + Map.of(0, new RangeMetadata(STREAM0, 1L, 0, 0L, 0L, BROKER0)), Map.of()); + S3StreamMetadataImage image = delta1.apply(); assertEquals(image2, delta1.apply()); testToImageAndBack(image2); @@ -102,13 +106,14 @@ public void testRanges() { .setRangeIndex(1) .setEpoch(2L) .setBrokerId(BROKER1) - .setStartOffset(101L), (short) 0)); + .setStartOffset(100L) + .setEndOffset(100L), (short) 0)); RecordTestUtils.replayAll(delta2, delta2Records); // verify delta and check image's write S3StreamMetadataImage image3 = new S3StreamMetadataImage( STREAM0, 2L, 0L, Map.of( - 0, new RangeMetadata(STREAM0, 1L, 0, 0L, 100, BROKER0), - 1, new RangeMetadata(STREAM0, 2L, 1, 101L, 100, BROKER1)), List.of()); + 0, new RangeMetadata(STREAM0, 1L, 0, 0, 100, BROKER0), + 1, new RangeMetadata(STREAM0, 2L, 1, 100, 100, BROKER1)), Map.of()); assertEquals(image3, delta2.apply()); testToImageAndBack(image3); @@ -118,21 +123,22 @@ public void testRanges() { delta3Records.add(new ApiMessageAndVersion(new S3StreamRecord() .setStreamId(STREAM0) .setEpoch(2L) - .setStartOffset(101L), (short) 0)); + .setStartOffset(100L), (short) 0)); delta3Records.add(new ApiMessageAndVersion(new RemoveRangeRecord() .setStreamId(STREAM0) .setRangeIndex(0), (short) 0)); RecordTestUtils.replayAll(delta3, delta3Records); // verify delta and check image's write S3StreamMetadataImage image4 = new S3StreamMetadataImage( - STREAM0, 2L, 101L, Map.of( - 1, new RangeMetadata(STREAM0, 2L, 1, 101L, 100L, BROKER1)), List.of()); + STREAM0, 2L, 100L, Map.of( + 1, new RangeMetadata(STREAM0, 2L, 1, 100L, 100L, BROKER1)), Map.of()); + assertEquals(image4, delta3.apply()); } @Test public void testStreamObjects() { S3StreamMetadataImage image0 = new S3StreamMetadataImage( - STREAM0, 0L, 0L, Map.of(), List.of()); + STREAM0, 0L, 0L, Map.of(), Map.of()); List delta0Records = new ArrayList<>(); S3StreamMetadataDelta delta0 = new S3StreamMetadataDelta(image0); // 1. create streamObject0 and streamObject1 @@ -144,14 +150,14 @@ public void testStreamObjects() { delta0Records.add(new ApiMessageAndVersion(new S3StreamObjectRecord() .setObjectId(1L) .setStreamId(STREAM0) - .setStartOffset(101L) + .setStartOffset(100L) .setEndOffset(200L), (short) 0)); RecordTestUtils.replayAll(delta0, delta0Records); // verify delta and check image's write S3StreamMetadataImage image1 = new S3StreamMetadataImage( - STREAM0, 0L, 0L, Map.of(), List.of( - new S3StreamObject(0L, STREAM0, 0L, 100L), - new S3StreamObject(1L, STREAM0, 101L, 200L))); + STREAM0, 0L, 0L, Map.of(), Map.of( + 0L, new S3StreamObject(0L, STREAM0, 0L, 100L), + 1L, new S3StreamObject(1L, STREAM0, 100L, 200L))); assertEquals(image1, delta0.apply()); testToImageAndBack(image1); @@ -163,8 +169,8 @@ public void testStreamObjects() { RecordTestUtils.replayAll(delta1, delta1Records); // verify delta and check image's write S3StreamMetadataImage image2 = new S3StreamMetadataImage( - STREAM0, 0L, 0L, Map.of(), List.of( - new S3StreamObject(1L, STREAM0, 101L, 200L))); + STREAM0, 0L, 0L, Map.of(), Map.of( + 1L, new S3StreamObject(1L, STREAM0, 100L, 200L))); assertEquals(image2, delta1.apply()); testToImageAndBack(image2); } @@ -173,9 +179,9 @@ private void testToImageAndBack(S3StreamMetadataImage image) { RecordListWriter writer = new RecordListWriter(); ImageWriterOptions options = new ImageWriterOptions.Builder().build(); image.write(writer, options); - S3ObjectsDelta delta = new S3ObjectsDelta(S3ObjectsImage.EMPTY); + S3StreamMetadataDelta delta = new S3StreamMetadataDelta(S3StreamMetadataImage.EMPTY); RecordTestUtils.replayAll(delta, writer.records()); - S3ObjectsImage newImage = delta.apply(); + S3StreamMetadataImage newImage = delta.apply(); assertEquals(image, newImage); } diff --git a/metadata/src/test/java/org/apache/kafka/image/S3StreamsMetadataImageTest.java b/metadata/src/test/java/org/apache/kafka/image/S3StreamsMetadataImageTest.java index cc5d33a66a..5e45ff11d9 100644 --- a/metadata/src/test/java/org/apache/kafka/image/S3StreamsMetadataImageTest.java +++ b/metadata/src/test/java/org/apache/kafka/image/S3StreamsMetadataImageTest.java @@ -17,12 +17,21 @@ package org.apache.kafka.image; +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.util.Collections; import java.util.List; +import org.apache.kafka.common.metadata.AssignedStreamIdRecord; +import org.apache.kafka.image.writer.ImageWriterOptions; +import org.apache.kafka.image.writer.RecordListWriter; +import org.apache.kafka.metadata.RecordTestUtils; import org.apache.kafka.server.common.ApiMessageAndVersion; +import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; @Timeout(value = 40) +@Tag("S3Unit") public class S3StreamsMetadataImageTest { private static final long KB = 1024; @@ -46,7 +55,31 @@ public class S3StreamsMetadataImageTest { } @Test - public void testBasicChange() { + public void testAssignedChange() { + S3StreamsMetadataImage image0 = S3StreamsMetadataImage.EMPTY; + ApiMessageAndVersion record0 = new ApiMessageAndVersion(new AssignedStreamIdRecord() + .setAssignedStreamId(0), (short) 0); + S3StreamsMetadataDelta delta0 = new S3StreamsMetadataDelta(image0); + RecordTestUtils.replayAll(delta0, List.of(record0)); + S3StreamsMetadataImage image1 = new S3StreamsMetadataImage(0, Collections.emptyMap(), Collections.emptyMap()); + assertEquals(image1, delta0.apply()); + testToImageAndBack(image1); + + ApiMessageAndVersion record1 = new ApiMessageAndVersion(new AssignedStreamIdRecord() + .setAssignedStreamId(10), (short) 0); + S3StreamsMetadataDelta delta1 = new S3StreamsMetadataDelta(image1); + RecordTestUtils.replayAll(delta1, List.of(record1)); + S3StreamsMetadataImage image2 = new S3StreamsMetadataImage(10, Collections.emptyMap(), Collections.emptyMap()); + assertEquals(image2, delta1.apply()); + } + private void testToImageAndBack(S3StreamsMetadataImage image) { + RecordListWriter writer = new RecordListWriter(); + ImageWriterOptions options = new ImageWriterOptions.Builder().build(); + image.write(writer, options); + S3StreamsMetadataDelta delta = new S3StreamsMetadataDelta(S3StreamsMetadataImage.EMPTY); + RecordTestUtils.replayAll(delta, writer.records()); + S3StreamsMetadataImage newImage = delta.apply(); + assertEquals(image, newImage); } }