diff --git a/clients/src/main/java/org/apache/kafka/common/errors/s3/NodeFencedException.java b/clients/src/main/java/org/apache/kafka/common/errors/s3/NodeFencedException.java new file mode 100644 index 0000000000..8e4c816aa1 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/errors/s3/NodeFencedException.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.errors.s3; + +import org.apache.kafka.common.errors.ApiException; + +public class NodeFencedException extends ApiException { + + public NodeFencedException(String message) { + super(message); + } + +} diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java index 0565b0c40a..9f725e1d02 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java @@ -131,6 +131,7 @@ import org.apache.kafka.common.errors.s3.NodeEpochNotExistException; import org.apache.kafka.common.errors.s3.CompactedObjectsNotFoundException; import org.apache.kafka.common.errors.s3.KeyExistException; +import org.apache.kafka.common.errors.s3.NodeFencedException; import org.apache.kafka.common.errors.s3.ObjectNotExistException; import org.apache.kafka.common.errors.s3.OffsetNotMatchedException; import org.apache.kafka.common.errors.s3.RedundantOperationException; @@ -400,6 +401,7 @@ public enum Errors { NODE_EPOCH_NOT_EXIST(511, "The node's epoch does not exist", NodeEpochNotExistException::new), KEY_EXIST(512, "The key already exists.", KeyExistException::new), KEY_NOT_EXIST(513, "The key does not exist.", ObjectNotExistException::new), + NODE_FENCED(514, "The node is fenced.", NodeFencedException::new), STREAM_INNER_ERROR(599, "The stream inner error.", StreamInnerErrorException::new); // AutoMQ for Kafka inject end diff --git a/clients/src/main/resources/common/message/CommitStreamSetObjectRequest.json b/clients/src/main/resources/common/message/CommitStreamSetObjectRequest.json index 0c017441e1..c16ee628ee 100644 --- a/clients/src/main/resources/common/message/CommitStreamSetObjectRequest.json +++ b/clients/src/main/resources/common/message/CommitStreamSetObjectRequest.json @@ -131,6 +131,12 @@ "type": "[]int64", "versions": "0+", "about": "The IDs of the compacted S3 objects" + }, + { + "name": "FailoverMode", + "type": "bool", + "versions": "0+", + "about": "The failover mode enabled or not" } ] } \ No newline at end of file diff --git a/clients/src/main/resources/common/message/GetOpeningStreamsRequest.json b/clients/src/main/resources/common/message/GetOpeningStreamsRequest.json index 01263ca118..084403aa98 100644 --- a/clients/src/main/resources/common/message/GetOpeningStreamsRequest.json +++ b/clients/src/main/resources/common/message/GetOpeningStreamsRequest.json @@ -35,6 +35,12 @@ "type": "int64", "versions": "0+", "about": "The node epoch." + }, + { + "name": "FailoverMode", + "type": "bool", + "versions": "0+", + "about": "The failover mode enabled or not." } ] } \ No newline at end of file diff --git a/core/src/main/scala/kafka/log/stream/s3/ConfigUtils.java b/core/src/main/scala/kafka/log/stream/s3/ConfigUtils.java index 0bb22c3f14..c01d6e5757 100644 --- a/core/src/main/scala/kafka/log/stream/s3/ConfigUtils.java +++ b/core/src/main/scala/kafka/log/stream/s3/ConfigUtils.java @@ -24,7 +24,7 @@ public class ConfigUtils { public static Config to(KafkaConfig s) { return new Config() - .nodeId(s.brokerId()) + .nodeId(s.nodeId()) .endpoint(s.s3Endpoint()) .region(s.s3Region()) .bucket(s.s3Bucket()) @@ -56,8 +56,9 @@ public static Config to(KafkaConfig s) { .mockEnable(s.s3MockEnable()) .objectLogEnable(s.s3ObjectLogEnable()) .networkBaselineBandwidth(s.s3NetworkBaselineBandwidthProp()) - .refillPeriodMs(s.s3RefillPeriodMsProp()); - + .refillPeriodMs(s.s3RefillPeriodMsProp()) + .objectRetentionTimeInSecond(s.s3ObjectRetentionTimeInSecond()) + .failoverEnable(s.s3FailoverEnable()); } } diff --git a/core/src/main/scala/kafka/log/stream/s3/DefaultS3Client.java b/core/src/main/scala/kafka/log/stream/s3/DefaultS3Client.java index 0a72522890..ffb0df554f 100644 --- a/core/src/main/scala/kafka/log/stream/s3/DefaultS3Client.java +++ b/core/src/main/scala/kafka/log/stream/s3/DefaultS3Client.java @@ -43,10 +43,10 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static org.apache.kafka.metadata.stream.S3Config.ACCESS_KEY_NAME; -import static org.apache.kafka.metadata.stream.S3Config.SECRET_KEY_NAME; - public class DefaultS3Client implements Client { + public static final String ACCESS_KEY_NAME = "KAFKA_S3_ACCESS_KEY"; + public static final String SECRET_KEY_NAME = "KAFKA_S3_SECRET_KEY"; + private final static Logger LOGGER = LoggerFactory.getLogger(DefaultS3Client.class); private final Config config; private final StreamMetadataManager metadataManager; diff --git a/core/src/main/scala/kafka/log/stream/s3/failover/DefaultFailoverFactory.java b/core/src/main/scala/kafka/log/stream/s3/failover/DefaultFailoverFactory.java new file mode 100644 index 0000000000..47a0d1bd63 --- /dev/null +++ b/core/src/main/scala/kafka/log/stream/s3/failover/DefaultFailoverFactory.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.log.stream.s3.failover; + +import com.automq.stream.s3.failover.FailoverFactory; +import com.automq.stream.s3.metadata.S3ObjectMetadata; +import com.automq.stream.s3.metadata.StreamMetadata; +import com.automq.stream.s3.objects.CommitStreamSetObjectRequest; +import com.automq.stream.s3.objects.CommitStreamSetObjectResponse; +import com.automq.stream.s3.objects.CompactStreamObjectRequest; +import com.automq.stream.s3.objects.ObjectManager; +import com.automq.stream.s3.streams.StreamManager; +import kafka.log.stream.s3.objects.ControllerObjectManager; +import kafka.log.stream.s3.streams.ControllerStreamManager; + +import java.util.List; +import java.util.concurrent.CompletableFuture; + +public class DefaultFailoverFactory implements FailoverFactory { + private final ControllerStreamManager streamManager; + private final ControllerObjectManager objectManager; + + public DefaultFailoverFactory(ControllerStreamManager streamManager, ControllerObjectManager objectManager) { + this.streamManager = streamManager; + this.objectManager = objectManager; + } + @Override + public StreamManager getStreamManager(int nodeId, long epoch) { + return new StreamManager() { + @Override + public CompletableFuture> getOpeningStreams() { + return streamManager.getOpeningStreams(nodeId, epoch, true); + } + + @Override + public CompletableFuture> getStreams(List list) { + return CompletableFuture.failedFuture(new UnsupportedOperationException()); + } + + @Override + public CompletableFuture createStream() { + return CompletableFuture.failedFuture(new UnsupportedOperationException()); + } + + @Override + public CompletableFuture openStream(long streamId, long epoch) { + return CompletableFuture.failedFuture(new UnsupportedOperationException()); + } + + @Override + public CompletableFuture trimStream(long streamId, long epoch, long offset) { + return CompletableFuture.failedFuture(new UnsupportedOperationException()); + } + + @Override + public CompletableFuture closeStream(long streamId, long streamEpoch) { + return streamManager.closeStream(streamId, streamEpoch); + } + + @Override + public CompletableFuture deleteStream(long streamId, long epoch) { + return CompletableFuture.failedFuture(new UnsupportedOperationException()); + } + }; + } + + @Override + public ObjectManager getObjectManager(int nodeId, long epoch) { + return new ObjectManager() { + @Override + public CompletableFuture prepareObject(int count, long ttl) { + return objectManager.prepareObject(count, ttl); + } + + @Override + public CompletableFuture commitStreamSetObject(CommitStreamSetObjectRequest commitStreamSetObjectRequest) { + return objectManager.commitStreamSetObject(commitStreamSetObjectRequest, nodeId, epoch, true); + } + + @Override + public CompletableFuture compactStreamObject(CompactStreamObjectRequest compactStreamObjectRequest) { + return CompletableFuture.failedFuture(new UnsupportedOperationException()); + } + + @Override + public CompletableFuture> getObjects(long streamId, long startOffset, long endOffset, int limit) { + return CompletableFuture.failedFuture(new UnsupportedOperationException()); + } + + @Override + public CompletableFuture> getServerObjects() { + return CompletableFuture.failedFuture(new UnsupportedOperationException()); + } + + @Override + public CompletableFuture> getStreamObjects(long streamId, long startOffset, long endOffset, int limit) { + return CompletableFuture.failedFuture(new UnsupportedOperationException()); + } + }; + } +} diff --git a/core/src/main/scala/kafka/log/stream/s3/objects/ControllerObjectManager.java b/core/src/main/scala/kafka/log/stream/s3/objects/ControllerObjectManager.java index b3f708b44c..5992f97dc2 100644 --- a/core/src/main/scala/kafka/log/stream/s3/objects/ControllerObjectManager.java +++ b/core/src/main/scala/kafka/log/stream/s3/objects/ControllerObjectManager.java @@ -29,10 +29,10 @@ import kafka.log.stream.s3.network.ControllerRequestSender.ResponseHandleResult; import kafka.log.stream.s3.network.request.WrapRequest; import kafka.server.KafkaConfig; -import org.apache.kafka.common.message.CommitStreamSetObjectRequestData; -import org.apache.kafka.common.message.CommitStreamSetObjectResponseData; import org.apache.kafka.common.message.CommitStreamObjectRequestData; import org.apache.kafka.common.message.CommitStreamObjectResponseData; +import org.apache.kafka.common.message.CommitStreamSetObjectRequestData; +import org.apache.kafka.common.message.CommitStreamSetObjectResponseData; import org.apache.kafka.common.message.PrepareS3ObjectRequestData; import org.apache.kafka.common.message.PrepareS3ObjectResponseData; import org.apache.kafka.common.protocol.ApiKeys; @@ -102,6 +102,11 @@ public Builder toRequestBuilder() { @Override public CompletableFuture commitStreamSetObject(CommitStreamSetObjectRequest commitStreamSetObjectRequest) { + return commitStreamSetObject(commitStreamSetObjectRequest, nodeId, nodeId, false); + } + + public CompletableFuture commitStreamSetObject(CommitStreamSetObjectRequest commitStreamSetObjectRequest, + int nodeId, long nodeEpoch, boolean failoverMode) { CommitStreamSetObjectRequestData request = new CommitStreamSetObjectRequestData() .setNodeId(nodeId) .setNodeEpoch(nodeEpoch) @@ -114,7 +119,8 @@ public CompletableFuture commitStreamSetObject(Co .setStreamObjects(commitStreamSetObjectRequest.getStreamObjects() .stream() .map(Convertor::toStreamObjectInRequest).collect(Collectors.toList())) - .setCompactedObjectIds(commitStreamSetObjectRequest.getCompactedObjectIds()); + .setCompactedObjectIds(commitStreamSetObjectRequest.getCompactedObjectIds()) + .setFailoverMode(failoverMode); WrapRequest req = new WrapRequest() { @Override public ApiKeys apiKey() { diff --git a/core/src/main/scala/kafka/log/stream/s3/streams/ControllerStreamManager.java b/core/src/main/scala/kafka/log/stream/s3/streams/ControllerStreamManager.java index b4ed97e2d2..7739ba1586 100644 --- a/core/src/main/scala/kafka/log/stream/s3/streams/ControllerStreamManager.java +++ b/core/src/main/scala/kafka/log/stream/s3/streams/ControllerStreamManager.java @@ -17,11 +17,13 @@ package kafka.log.stream.s3.streams; +import com.automq.stream.s3.metadata.StreamMetadata; +import com.automq.stream.s3.metadata.StreamState; import com.automq.stream.s3.streams.StreamManager; import kafka.log.stream.s3.metadata.StreamMetadataManager; import kafka.log.stream.s3.network.ControllerRequestSender; -import kafka.log.stream.s3.network.ControllerRequestSender.ResponseHandleResult; import kafka.log.stream.s3.network.ControllerRequestSender.RequestTask; +import kafka.log.stream.s3.network.ControllerRequestSender.ResponseHandleResult; import kafka.log.stream.s3.network.request.BatchRequest; import kafka.log.stream.s3.network.request.WrapRequest; import kafka.server.KafkaConfig; @@ -52,8 +54,6 @@ import org.apache.kafka.common.requests.s3.GetOpeningStreamsResponse; import org.apache.kafka.common.requests.s3.OpenStreamsRequest; import org.apache.kafka.common.requests.s3.TrimStreamsRequest; -import com.automq.stream.s3.metadata.StreamMetadata; -import com.automq.stream.s3.metadata.StreamState; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -80,9 +80,14 @@ public ControllerStreamManager(StreamMetadataManager streamMetadataManager, Cont @Override public CompletableFuture> getOpeningStreams() { + return getOpeningStreams(nodeId, nodeEpoch, false); + } + + public CompletableFuture> getOpeningStreams(int nodeId, long nodeEpoch, boolean failoverMode) { GetOpeningStreamsRequestData request = new GetOpeningStreamsRequestData() - .setNodeId(nodeId) - .setNodeEpoch(nodeEpoch); + .setNodeId(nodeId) + .setNodeEpoch(nodeEpoch) + .setFailoverMode(failoverMode); WrapRequest req = new WrapRequest() { @Override public ApiKeys apiKey() { @@ -97,22 +102,22 @@ public Builder toRequestBuilder() { CompletableFuture> future = new CompletableFuture<>(); RequestTask> task = new RequestTask>(req, future, - response -> { - GetOpeningStreamsResponseData resp = response.data(); - Errors code = Errors.forCode(resp.errorCode()); - switch (code) { - case NONE: - return ResponseHandleResult.withSuccess(resp.streamMetadataList().stream() - .map(m -> new StreamMetadata(m.streamId(), m.epoch(), m.startOffset(), m.endOffset(), StreamState.OPENED)) - .collect(Collectors.toList())); - case NODE_EPOCH_EXPIRED: - LOGGER.error("Node epoch expired: {}, code: {}", req, code); - throw code.exception(); - default: - LOGGER.error("Error while getting streams offset: {}, code: {}, retry later", req, code); - return ResponseHandleResult.withRetry(); - } - }); + response -> { + GetOpeningStreamsResponseData resp = response.data(); + Errors code = Errors.forCode(resp.errorCode()); + switch (code) { + case NONE: + return ResponseHandleResult.withSuccess(resp.streamMetadataList().stream() + .map(m -> new StreamMetadata(m.streamId(), m.epoch(), m.startOffset(), m.endOffset(), StreamState.OPENED)) + .collect(Collectors.toList())); + case NODE_EPOCH_EXPIRED: + LOGGER.error("Node epoch expired: {}, code: {}", req, code); + throw code.exception(); + default: + LOGGER.error("Error while getting streams offset: {}, code: {}, retry later", req, code); + return ResponseHandleResult.withRetry(); + } + }); this.requestSender.send(task); return future; } @@ -141,9 +146,9 @@ public ApiKeys apiKey() { @Override public Builder toRequestBuilder() { return new CreateStreamsRequest.Builder( - new CreateStreamsRequestData() - .setNodeId(nodeId) - .setNodeEpoch(nodeEpoch)).addSubRequest(request); + new CreateStreamsRequestData() + .setNodeId(nodeId) + .setNodeEpoch(nodeEpoch)).addSubRequest(request); } }; CompletableFuture future = new CompletableFuture<>(); @@ -167,8 +172,8 @@ public Builder toRequestBuilder() { @Override public CompletableFuture openStream(long streamId, long epoch) { OpenStreamRequest request = new OpenStreamRequest() - .setStreamId(streamId) - .setStreamEpoch(epoch); + .setStreamId(streamId) + .setStreamEpoch(epoch); WrapRequest req = new BatchRequest() { @Override public Builder addSubRequest(Builder builder) { @@ -185,9 +190,9 @@ public ApiKeys apiKey() { @Override public Builder toRequestBuilder() { return new OpenStreamsRequest.Builder( - new OpenStreamsRequestData() - .setNodeId(nodeId) - .setNodeEpoch(nodeEpoch)).addSubRequest(request); + new OpenStreamsRequestData() + .setNodeId(nodeId) + .setNodeEpoch(nodeEpoch)).addSubRequest(request); } }; CompletableFuture future = new CompletableFuture<>(); @@ -196,7 +201,7 @@ public Builder toRequestBuilder() { switch (code) { case NONE: return ResponseHandleResult.withSuccess( - new StreamMetadata(streamId, epoch, resp.startOffset(), resp.nextOffset(), StreamState.OPENED)); + new StreamMetadata(streamId, epoch, resp.startOffset(), resp.nextOffset(), StreamState.OPENED)); case NODE_EPOCH_EXPIRED: case NODE_EPOCH_NOT_EXIST: LOGGER.error("Node epoch expired or not exist: {}, code: {}", req, code); @@ -219,9 +224,9 @@ public Builder toRequestBuilder() { @Override public CompletableFuture trimStream(long streamId, long epoch, long newStartOffset) { TrimStreamRequest request = new TrimStreamRequest() - .setStreamId(streamId) - .setStreamEpoch(epoch) - .setNewStartOffset(newStartOffset); + .setStreamId(streamId) + .setStreamEpoch(epoch) + .setNewStartOffset(newStartOffset); WrapRequest req = new BatchRequest() { @Override public Builder addSubRequest(Builder builder) { @@ -238,9 +243,9 @@ public ApiKeys apiKey() { @Override public Builder toRequestBuilder() { return new TrimStreamsRequest.Builder( - new TrimStreamsRequestData() - .setNodeId(nodeId) - .setNodeEpoch(nodeEpoch)).addSubRequest(request); + new TrimStreamsRequestData() + .setNodeId(nodeId) + .setNodeEpoch(nodeEpoch)).addSubRequest(request); } }; CompletableFuture future = new CompletableFuture<>(); @@ -271,8 +276,8 @@ public Builder toRequestBuilder() { @Override public CompletableFuture closeStream(long streamId, long epoch) { CloseStreamRequest request = new CloseStreamRequest() - .setStreamId(streamId) - .setStreamEpoch(epoch); + .setStreamId(streamId) + .setStreamEpoch(epoch); WrapRequest req = new BatchRequest() { @Override public Builder addSubRequest(Builder builder) { @@ -289,9 +294,9 @@ public ApiKeys apiKey() { @Override public Builder toRequestBuilder() { return new CloseStreamsRequest.Builder( - new CloseStreamsRequestData() - .setNodeId(nodeId) - .setNodeEpoch(nodeEpoch)).addSubRequest(request); + new CloseStreamsRequestData() + .setNodeId(nodeId) + .setNodeEpoch(nodeEpoch)).addSubRequest(request); } }; CompletableFuture future = new CompletableFuture<>(); @@ -320,8 +325,8 @@ public Builder toRequestBuilder() { @Override public CompletableFuture deleteStream(long streamId, long epoch) { DeleteStreamRequest request = new DeleteStreamRequest() - .setStreamId(streamId) - .setStreamEpoch(epoch); + .setStreamId(streamId) + .setStreamEpoch(epoch); WrapRequest req = new BatchRequest() { @Override public Builder addSubRequest(Builder builder) { @@ -338,9 +343,9 @@ public ApiKeys apiKey() { @Override public Builder toRequestBuilder() { return new DeleteStreamsRequest.Builder( - new DeleteStreamsRequestData() - .setNodeId(nodeId) - .setNodeEpoch(nodeEpoch)).addSubRequest(request); + new DeleteStreamsRequestData() + .setNodeId(nodeId) + .setNodeEpoch(nodeEpoch)).addSubRequest(request); } }; CompletableFuture future = new CompletableFuture<>(); diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala b/core/src/main/scala/kafka/server/ControllerServer.scala index 492a601be9..ade9abc58f 100644 --- a/core/src/main/scala/kafka/server/ControllerServer.scala +++ b/core/src/main/scala/kafka/server/ControllerServer.scala @@ -21,6 +21,7 @@ import com.automq.stream.s3.metadata.ObjectUtils import kafka.autobalancer.AutoBalancerManager import kafka.autobalancer.config.AutoBalancerControllerConfig import kafka.cluster.Broker.ServerInfo +import kafka.log.stream.s3.ConfigUtils import kafka.metrics.{KafkaMetricsGroup, LinuxIoMetricsCollector} import kafka.migration.MigrationPropagator import kafka.network.{DataPlaneAcceptor, SocketServer} @@ -42,7 +43,6 @@ import org.apache.kafka.common.config.ConfigException import org.apache.kafka.metadata.authorizer.ClusterMetadataAuthorizer import org.apache.kafka.metadata.bootstrap.BootstrapMetadata import org.apache.kafka.metadata.migration.{KRaftMigrationDriver, LegacyPropagator} -import org.apache.kafka.metadata.stream.S3Config import org.apache.kafka.raft.RaftConfig import org.apache.kafka.server.authorizer.Authorizer import org.apache.kafka.server.common.ApiMessageAndVersion @@ -213,7 +213,7 @@ class ControllerServer( val maxIdleIntervalNs = config.metadataMaxIdleIntervalNs.fold(OptionalLong.empty)(OptionalLong.of) // AutoMQ for Kafka inject start - val s3Config = new S3Config(config.s3Endpoint, config.s3Region, config.s3Bucket, config.s3ObjectRetentionTimeInSecond, config.s3MockEnable) + val streamConfig = ConfigUtils.to(config) var namespace = config.elasticStreamNamespace namespace = if (namespace == null || namespace.isEmpty) { "_kafka_" + clusterId @@ -243,7 +243,7 @@ class ControllerServer( setBootstrapMetadata(bootstrapMetadata). setFatalFaultHandler(sharedServer.quorumControllerFaultHandler). setZkMigrationEnabled(config.migrationEnabled). - setS3Config(s3Config). + setStreamConfig(streamConfig). setQuorumVoters(config.quorumVoters) } authorizer match { diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index e0622543cc..b1241a58d6 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -719,6 +719,7 @@ object KafkaConfig { val S3ObjectLogEnableProp = "s3.object.log.enable" val S3NetworkBaselineBandwidthProp = "s3.network.baseline.bandwidth" val S3RefillPeriodMsProp = "s3.network.refill.period.ms" + val S3FailoverEnableProp = "s3.failover.enable" val S3EndpointDoc = "The S3 endpoint, ex. https://s3.{region}.amazonaws.com." val S3RegionDoc = "The S3 region, ex. us-east-1." @@ -755,6 +756,7 @@ object KafkaConfig { val S3ObjectLogEnableDoc = "Whether to enable S3 object trace log." val S3NetworkBaselineBandwidthDoc = "The network baseline bandwidth in Bytes/s." val S3RefillPeriodMsDoc = "The network bandwidth token refill period in milliseconds." + val S3FailoverEnableDoc = "Failover mode: if enable, the controller will scan failed node and failover the failed node" // AutoMQ for Kafka inject end @@ -1588,6 +1590,7 @@ object KafkaConfig { .define(S3ObjectLogEnableProp, BOOLEAN, false, LOW, S3ObjectLogEnableDoc) .define(S3NetworkBaselineBandwidthProp, LONG, Defaults.S3NetworkBaselineBandwidth, MEDIUM, S3NetworkBaselineBandwidthDoc) .define(S3RefillPeriodMsProp, INT, Defaults.S3RefillPeriodMs, MEDIUM, S3RefillPeriodMsDoc) + .define(S3FailoverEnableProp, BOOLEAN, false, MEDIUM, S3FailoverEnableDoc) // AutoMQ for Kafka inject end } @@ -2158,6 +2161,7 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami val s3ObjectLogEnable = getBoolean(KafkaConfig.S3ObjectLogEnableProp) val s3NetworkBaselineBandwidthProp = getLong(KafkaConfig.S3NetworkBaselineBandwidthProp) val s3RefillPeriodMsProp = getInt(KafkaConfig.S3RefillPeriodMsProp) + val s3FailoverEnable = getBoolean(KafkaConfig.S3FailoverEnableProp) // AutoMQ for Kafka inject end def addReconfigurable(reconfigurable: Reconfigurable): Unit = { diff --git a/core/src/test/java/kafka/log/stream/s3/StreamMetadataManagerTest.java b/core/src/test/java/kafka/log/stream/s3/StreamMetadataManagerTest.java index 35b7c5bf6c..aeda3b7fa7 100644 --- a/core/src/test/java/kafka/log/stream/s3/StreamMetadataManagerTest.java +++ b/core/src/test/java/kafka/log/stream/s3/StreamMetadataManagerTest.java @@ -114,7 +114,7 @@ STREAM1, new StreamOffsetRange(STREAM1, 0L, 100L)), 1L), S3StreamsMetadataImage streamsImage = new S3StreamsMetadataImage(STREAM0, Map.of(STREAM0, streamImage), Map.of(BROKER0, walMetadataImage0)); - image0 = new MetadataImage(new MetadataProvenance(0, 0, 0), null, null, null, null, null, null, null, streamsImage, objectsImage, null); + image0 = new MetadataImage(new MetadataProvenance(0, 0, 0), null, null, null, null, null, null, null, streamsImage, objectsImage, null, null); ranges = new HashMap<>(ranges); ranges.put(1, new RangeMetadata(STREAM0, 1L, 1, 100L, 150L, BROKER0)); @@ -123,7 +123,7 @@ STREAM1, new StreamOffsetRange(STREAM1, 0L, 100L)), 1L), streamImage = new S3StreamMetadataImage(STREAM0, 2L, StreamState.OPENED, 1, 10L, ranges, streamObjects); streamsImage = new S3StreamsMetadataImage(STREAM0, Map.of(STREAM0, streamImage), Map.of(BROKER0, NodeS3StreamSetObjectMetadataImage.EMPTY)); - image1 = new MetadataImage(new MetadataProvenance(1, 1, 1), null, null, null, null, null, null, null, streamsImage, objectsImage, null); + image1 = new MetadataImage(new MetadataProvenance(1, 1, 1), null, null, null, null, null, null, null, streamsImage, objectsImage, null, null); ranges = new HashMap<>(ranges); ranges.put(2, new RangeMetadata(STREAM0, 2L, 2, 150L, 200L, BROKER0)); @@ -132,7 +132,7 @@ STREAM1, new StreamOffsetRange(STREAM1, 0L, 100L)), 1L), streamImage = new S3StreamMetadataImage(STREAM0, 3L, StreamState.OPENED, 2, 10L, ranges, streamObjects); streamsImage = new S3StreamsMetadataImage(STREAM0, Map.of(STREAM0, streamImage), Map.of(BROKER0, NodeS3StreamSetObjectMetadataImage.EMPTY)); - image2 = new MetadataImage(new MetadataProvenance(2, 2, 2), null, null, null, null, null, null, null, streamsImage, objectsImage, null); + image2 = new MetadataImage(new MetadataProvenance(2, 2, 2), null, null, null, null, null, null, null, streamsImage, objectsImage, null, null); } @Test diff --git a/core/src/test/java/kafka/test/MockController.java b/core/src/test/java/kafka/test/MockController.java index 5c4c14e998..42f5ea30f8 100644 --- a/core/src/test/java/kafka/test/MockController.java +++ b/core/src/test/java/kafka/test/MockController.java @@ -560,5 +560,10 @@ public CompletableFuture deleteKVs(ControllerRequestConte throw new UnsupportedOperationException(); } + @Override + public CompletableFuture failover(ControllerRequestContext context) { + throw new UnsupportedOperationException(); + } + // AutoMQ for Kafka inject end } diff --git a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala index 07c0d3e6e5..5b25256ae7 100644 --- a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala +++ b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala @@ -74,7 +74,8 @@ object MetadataCacheTest { image.acls(), image.streamsMetadata(), image.objectsMetadata(), - image.kv()) + image.kv(), + image.failoverContext()) val delta = new MetadataDelta.Builder().setImage(partialImage).build() def toRecord(broker: UpdateMetadataBroker): RegisterBrokerRecord = { diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index 7e0e50a74f..8a9ae9c292 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -54,7 +54,7 @@ import org.apache.kafka.common.requests._ import org.apache.kafka.common.security.auth.KafkaPrincipal import org.apache.kafka.common.utils.{LogContext, Time, Utils} import org.apache.kafka.common.{IsolationLevel, Node, TopicIdPartition, TopicPartition, Uuid} -import org.apache.kafka.image.{AclsImage, ClientQuotasImage, ClusterImageTest, ConfigurationsImage, FeaturesImage, KVImage, MetadataImage, MetadataProvenance, ProducerIdsImage, S3ObjectsImage, S3StreamsMetadataImage, TopicsDelta, TopicsImage} +import org.apache.kafka.image.{AclsImage, ClientQuotasImage, ClusterImageTest, ConfigurationsImage, FailoverContextImage, FeaturesImage, KVImage, MetadataImage, MetadataProvenance, ProducerIdsImage, S3ObjectsImage, S3StreamsMetadataImage, TopicsDelta, TopicsImage} import org.apache.kafka.metadata.LeaderConstants.NO_LEADER import org.apache.kafka.metadata.LeaderRecoveryState import org.apache.kafka.server.common.MetadataVersion.IBP_2_6_IV0 @@ -4131,7 +4131,8 @@ class ReplicaManagerTest { AclsImage.EMPTY, S3StreamsMetadataImage.EMPTY, S3ObjectsImage.EMPTY, - KVImage.EMPTY + KVImage.EMPTY, + FailoverContextImage.EMPTY ) } diff --git a/metadata/src/main/java/org/apache/kafka/controller/Controller.java b/metadata/src/main/java/org/apache/kafka/controller/Controller.java index ebb83ec56c..1c4e42abed 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/Controller.java +++ b/metadata/src/main/java/org/apache/kafka/controller/Controller.java @@ -505,5 +505,7 @@ CompletableFuture deleteKVs( DeleteKVsRequestData request ); + CompletableFuture failover(ControllerRequestContext context); + // AutoMQ for Kafka inject end } diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java index 31a0c4bb81..7a01c24870 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java +++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java @@ -17,6 +17,7 @@ package org.apache.kafka.controller; +import com.automq.stream.s3.Config; import com.automq.stream.s3.metrics.S3StreamMetricsRegistry; import com.automq.stream.s3.operator.DefaultS3Operator; import com.automq.stream.s3.operator.MemoryS3Operator; @@ -91,6 +92,7 @@ import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord; import org.apache.kafka.common.metadata.ClientQuotaRecord; import org.apache.kafka.common.metadata.ConfigRecord; +import org.apache.kafka.common.metadata.FailoverContextRecord; import org.apache.kafka.common.metadata.FeatureLevelRecord; import org.apache.kafka.common.metadata.FenceBrokerRecord; import org.apache.kafka.common.metadata.KVRecord; @@ -127,6 +129,7 @@ import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.controller.stream.FailoverControlManager; import org.apache.kafka.controller.stream.KVControlManager; import org.apache.kafka.controller.stream.S3ObjectControlManager; import org.apache.kafka.controller.stream.StreamControlManager; @@ -140,7 +143,6 @@ import org.apache.kafka.metadata.migration.ZkRecordConsumer; import org.apache.kafka.metadata.placement.ReplicaPlacer; import org.apache.kafka.metadata.placement.StripedReplicaPlacer; -import org.apache.kafka.metadata.stream.S3Config; import org.apache.kafka.queue.EventQueue.EarliestDeadlineFunction; import org.apache.kafka.queue.EventQueue; import org.apache.kafka.queue.KafkaEventQueue; @@ -241,7 +243,7 @@ static public class Builder { // AutoMQ for Kafka inject start - private S3Config s3Config; + private Config streamConfig; private List quorumVoters; // AutoMQ for Kafka inject end @@ -367,8 +369,8 @@ public Builder setZkMigrationEnabled(boolean zkMigrationEnabled) { // AutoMQ for Kafka inject start - public Builder setS3Config(S3Config s3Config) { - this.s3Config = s3Config; + public Builder setStreamConfig(Config streamConfig) { + this.streamConfig = streamConfig; return this; } @@ -430,7 +432,7 @@ public QuorumController build() throws Exception { bootstrapMetadata, maxRecordsPerBatch, zkMigrationEnabled, - s3Config, + streamConfig, quorumVoters ); } catch (Exception e) { @@ -1552,6 +1554,9 @@ private void replay(ApiMessage message, Optional snapshotId, lon case UPDATE_NEXT_NODE_ID_RECORD: clusterControl.replay((UpdateNextNodeIdRecord) message); break; + case FAILOVER_CONTEXT_RECORD: + failoverControlManager.replay((FailoverContextRecord) message); + break; // AutoMQ for Kafka inject end default: throw new RuntimeException("Unhandled record type " + type); @@ -1784,7 +1789,7 @@ private enum ImbalanceSchedule { // AutoMQ for Kafka inject start - private final S3Config s3Config; + private final Config streamConfig; /** * An object which stores the controller's view of the S3 objects. @@ -1804,6 +1809,11 @@ private enum ImbalanceSchedule { */ private final KVControlManager kvControlManager; + /** + * Failover control manager which handles the failover of the failed node. + */ + private final FailoverControlManager failoverControlManager; + // AutoMQ for Kafka inject end private QuorumController( @@ -1831,7 +1841,7 @@ private QuorumController( BootstrapMetadata bootstrapMetadata, int maxRecordsPerBatch, boolean zkMigrationEnabled, - S3Config s3Config, + Config streamConfig, List quorumVoters ) { this.fatalFaultHandler = fatalFaultHandler; @@ -1913,20 +1923,21 @@ private QuorumController( this.zkRecordConsumer = new MigrationRecordConsumer(); // AutoMQ for Kafka inject start - this.s3Config = s3Config; + this.streamConfig = streamConfig; S3Operator s3Operator; - if (s3Config.mock()) { + if (streamConfig.mockEnable()) { // only use for test s3Operator = new MemoryS3Operator(); } else { S3StreamMetricsRegistry.setMetricsGroup(new KafkaS3StreamMetricsGroup()); - s3Operator = new DefaultS3Operator(s3Config.endpoint(), s3Config.region(), s3Config.bucket(), false, - s3Config.getAccessKey(), s3Config.getSecretKey()); + s3Operator = new DefaultS3Operator(streamConfig.endpoint(), streamConfig.region(), streamConfig.bucket(), false, + streamConfig.accessKey(), streamConfig.secretKey()); } this.s3ObjectControlManager = new S3ObjectControlManager( - this, snapshotRegistry, logContext, clusterId, s3Config, s3Operator); + this, snapshotRegistry, logContext, clusterId, streamConfig, s3Operator); this.streamControlManager = new StreamControlManager(snapshotRegistry, logContext, this.s3ObjectControlManager); this.kvControlManager = new KVControlManager(snapshotRegistry, logContext); + this.failoverControlManager = new FailoverControlManager(this, clusterControl, snapshotRegistry, streamConfig.failoverEnable()); // AutoMQ for Kafka inject end updateWriteOffset(-1); @@ -2448,6 +2459,12 @@ public CompletableFuture deleteKVs(ControllerRequestConte ); } + @Override + public CompletableFuture failover(ControllerRequestContext context) { + return appendWriteEvent("failover", context.deadlineNs(), + failoverControlManager::failover); + } + // AutoMQ for Kafka inject end // VisibleForTesting diff --git a/metadata/src/main/java/org/apache/kafka/controller/stream/FailoverControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/stream/FailoverControlManager.java new file mode 100644 index 0000000000..e8abf25557 --- /dev/null +++ b/metadata/src/main/java/org/apache/kafka/controller/stream/FailoverControlManager.java @@ -0,0 +1,235 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.controller.stream; + +import org.apache.kafka.common.metadata.FailoverContextRecord; +import org.apache.kafka.common.utils.ThreadUtils; +import org.apache.kafka.controller.ClusterControlManager; +import org.apache.kafka.controller.ControllerRequestContext; +import org.apache.kafka.controller.ControllerResult; +import org.apache.kafka.controller.QuorumController; +import org.apache.kafka.metadata.BrokerRegistration; +import org.apache.kafka.metadata.stream.FailoverStatus; +import org.apache.kafka.server.common.ApiMessageAndVersion; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.apache.kafka.timeline.TimelineHashMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.OptionalLong; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +public class FailoverControlManager { + private static final Logger LOGGER = LoggerFactory.getLogger(FailoverControlManager.class); + private static final int MAX_VOLUME_ATTACH_COUNT = 1; + private final QuorumController quorumController; + private final ClusterControlManager clusterControlManager; + private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(ThreadUtils.createThreadFactory("failover-controller", true)); + /** + * failover contexts: failedNodeId -> context + */ + private final TimelineHashMap failoverContexts; + private List failedNodes; + /** + * attached: failedNodeId -> context with targetNodeId + */ + private final Map attached = new ConcurrentHashMap<>(); + + public FailoverControlManager( + QuorumController quorumController, + ClusterControlManager clusterControlManager, + SnapshotRegistry registry, boolean failoverEnable) { + this.quorumController = quorumController; + this.clusterControlManager = clusterControlManager; + this.failoverContexts = new TimelineHashMap<>(registry, 0); + if (failoverEnable) { + this.scheduler.scheduleWithFixedDelay(this::runFailoverTask, 1, 1, TimeUnit.SECONDS); + } + } + + void runFailoverTask() { + if (!quorumController.isActive()) { + return; + } + try { + scanFailedNodes(); + this.quorumController.failover(new ControllerRequestContext(null, null, OptionalLong.empty())).get(); + backgroundAttach(); + } catch (Throwable e) { + LOGGER.error("run failover task failed", e); + } + } + + void scanFailedNodes() { + // TODO: run command to get failed nodes + } + + + public ControllerResult failover() { + // code in run should be non-blocking + List records = new ArrayList<>(); + List failedNodes = this.failedNodes; + addNewContext(failedNodes, records); + doFailover(records); + complete(failedNodes, records); + return ControllerResult.of(records, null); + } + + private void addNewContext(List failedNodes, List records) { + for (FailedNode failedNode : failedNodes) { + if (failoverContexts.containsKey(failedNode.getNodeId())) { + // the failed node is already in failover mode, skip + continue; + } + records.add(new ApiMessageAndVersion( + new FailoverContextRecord() + .setFailedNodeId(failedNode.getNodeId()) + .setVolumeId(failedNode.getVolumeId()) + .setStatus(FailoverStatus.WAITING.name()), + (short) 0)); + } + } + + private void doFailover(List records) { + for (FailoverContextRecord record : attached.values()) { + records.add(new ApiMessageAndVersion(record, (short) 0)); + failoverContexts.put(record.failedNodeId(), record); + } + attached.clear(); + } + + private void complete(List failedNodes, List records) { + Set failedNodeIdSet = failedNodes.stream() + .map(FailedNode::getNodeId) + .collect(Collectors.toSet()); + failoverContexts.forEach((nodeId, context) -> { + if (!failedNodeIdSet.contains(nodeId)) { + FailoverContextRecord completedRecord = context.duplicate(); + completedRecord.setStatus(FailoverStatus.DONE.name()); + // the target node already complete the recover and delete the volume, so remove the failover context + records.add(new ApiMessageAndVersion(completedRecord, (short) 0)); + } + }); + } + + /** + * Select an alive node to perform failover and move the FailoverContext status. + */ + + void backgroundAttach() { + try { + backgroundAttach0(); + } catch (Throwable e) { + LOGGER.error("failover background attach failed", e); + } + } + + void backgroundAttach0() { + Map attachedCounts = new HashMap<>(attached.values().stream() + .collect(Collectors.groupingBy(FailoverContextRecord::targetNodeId, Collectors.counting()))); + + List brokers = clusterControlManager.getActiveBrokers(); + if (brokers.isEmpty()) { + return; + } + + List> attachCfList = new ArrayList<>(); + int attachIndex = 0; + for (FailoverContextRecord context : failoverContexts.values()) { + int failedNodeId = context.failedNodeId(); + if (!FailoverStatus.WAITING.name().equals(context.status())) { + continue; + } + if (attached.containsKey(context.failedNodeId())) { + continue; + } + for (int i = 0; i < brokers.size(); i++, attachIndex++) { + BrokerRegistration broker = brokers.get(Math.abs((attachIndex + i) % brokers.size())); + long attachedCount = Optional.ofNullable(attachedCounts.get(broker.id())).orElse(0L); + if (attachedCount < MAX_VOLUME_ATTACH_COUNT) { + CompletableFuture attachCf = attach(new FailedNode(failedNodeId, context.volumeId()), broker.id()); + attachCfList.add(attachCf.thenAccept(device -> { + FailoverContextRecord attachedRecord = context.duplicate(); + attachedRecord.setTargetNodeId(broker.id()); + attachedRecord.setDevice(device); + attachedRecord.setStatus(FailoverStatus.RECOVERING.name()); + attached.put(failedNodeId, attachedRecord); + }).exceptionally(ex -> { + LOGGER.error("attach failed node {} to target node {} failed", context.failedNodeId(), broker.id(), ex); + return null; + })); + attachedCounts.put(broker.id(), attachedCount + 1); + } + } + } + CompletableFuture.allOf(attachCfList.toArray(new CompletableFuture[0])).join(); + } + + public void replay(FailoverContextRecord record) { + failoverContexts.put(record.failedNodeId(), record); + } + + /** + * Attach the failed node volume to the target node. + * + * @param failedNode {@link FailedNode} + * @param targetNodeId target node id + * @return the device name of volume attached to the target node + */ + CompletableFuture attach(FailedNode failedNode, int targetNodeId) { + // TODO: run command to attach + return CompletableFuture.completedFuture(""); + } + + static class FailedNode { + private int nodeId; + private String volumeId; + + public FailedNode(int nodeId, String volumeId) { + this.nodeId = nodeId; + this.volumeId = volumeId; + } + + public int getNodeId() { + return nodeId; + } + + public void setNodeId(int nodeId) { + this.nodeId = nodeId; + } + + public String getVolumeId() { + return volumeId; + } + + public void setVolumeId(String volumeId) { + this.volumeId = volumeId; + } + } + +} diff --git a/metadata/src/main/java/org/apache/kafka/controller/stream/NodeMetadata.java b/metadata/src/main/java/org/apache/kafka/controller/stream/NodeMetadata.java new file mode 100644 index 0000000000..30ff5d2504 --- /dev/null +++ b/metadata/src/main/java/org/apache/kafka/controller/stream/NodeMetadata.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.controller.stream; + +import org.apache.kafka.metadata.stream.S3StreamSetObject; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.apache.kafka.timeline.TimelineHashMap; +import org.apache.kafka.timeline.TimelineLong; +import org.apache.kafka.timeline.TimelineObject; + +public class NodeMetadata { + + private final int nodeId; + private final TimelineLong nodeEpoch; + private final TimelineObject failoverMode; + private final TimelineHashMap streamSetObjects; + + public NodeMetadata(int nodeId, long nodeEpoch, boolean failoverMode, SnapshotRegistry registry) { + this.nodeId = nodeId; + this.nodeEpoch = new TimelineLong(registry); + this.nodeEpoch.set(nodeEpoch); + this.failoverMode = new TimelineObject<>(registry, failoverMode); + this.streamSetObjects = new TimelineHashMap<>(registry, 0); + } + + public int getNodeId() { + return nodeId; + } + + public long getNodeEpoch() { + return nodeEpoch.get(); + } + + public void setNodeEpoch(long nodeEpoch) { + this.nodeEpoch.set(nodeEpoch); + } + + public boolean getFailoverMode() { + return failoverMode.get(); + } + + public void setFailoverMode(boolean failoverMode) { + this.failoverMode.set(failoverMode); + } + + public TimelineHashMap streamSetObjects() { + return streamSetObjects; + } + + @Override + public String toString() { + return "NodeS3StreamSetObjectMetadata{" + + "nodeId=" + nodeId + + ", nodeEpoch=" + nodeEpoch + + ", streamSetObjects=" + streamSetObjects + + '}'; + } +} diff --git a/metadata/src/main/java/org/apache/kafka/controller/stream/S3ObjectControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/stream/S3ObjectControlManager.java index 23f323658a..e7431fb4a1 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/stream/S3ObjectControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/stream/S3ObjectControlManager.java @@ -19,6 +19,7 @@ import java.util.Arrays; +import com.automq.stream.s3.Config; import com.automq.stream.s3.metadata.ObjectUtils; import com.automq.stream.s3.operator.S3Operator; import org.apache.kafka.common.message.PrepareS3ObjectRequestData; @@ -31,7 +32,6 @@ import org.apache.kafka.controller.ControllerRequestContext; import org.apache.kafka.controller.ControllerResult; import org.apache.kafka.controller.QuorumController; -import org.apache.kafka.metadata.stream.S3Config; import org.apache.kafka.metadata.stream.S3Object; import org.apache.kafka.metadata.stream.S3ObjectState; import org.apache.kafka.server.common.ApiMessageAndVersion; @@ -74,7 +74,7 @@ public class S3ObjectControlManager { private final String clusterId; - private final S3Config config; + private final Config config; /** * The objectId of the next object to be prepared. (start from 0) @@ -99,7 +99,7 @@ public S3ObjectControlManager( SnapshotRegistry snapshotRegistry, LogContext logContext, String clusterId, - S3Config config, + Config config, S3Operator operator) { this.quorumController = quorumController; this.snapshotRegistry = snapshotRegistry; diff --git a/metadata/src/main/java/org/apache/kafka/controller/stream/S3StreamMetadata.java b/metadata/src/main/java/org/apache/kafka/controller/stream/S3StreamMetadata.java new file mode 100644 index 0000000000..31a82013cb --- /dev/null +++ b/metadata/src/main/java/org/apache/kafka/controller/stream/S3StreamMetadata.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.controller.stream; + +import com.automq.stream.s3.metadata.StreamState; +import org.apache.kafka.metadata.stream.RangeMetadata; +import org.apache.kafka.metadata.stream.S3StreamObject; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.apache.kafka.timeline.TimelineHashMap; +import org.apache.kafka.timeline.TimelineInteger; +import org.apache.kafka.timeline.TimelineLong; +import org.apache.kafka.timeline.TimelineObject; + +import java.util.Map; + +public class S3StreamMetadata { + + // current epoch, when created but not open, use -1 represent + private final TimelineLong currentEpoch; + // rangeIndex, when created but not open, there is no range, use -1 represent + private final TimelineInteger currentRangeIndex; + /** + * The visible start offset of stream, it may be larger than the start offset of current range. + */ + private final TimelineLong startOffset; + private final TimelineObject currentState; + private final TimelineHashMap ranges; + private final TimelineHashMap streamObjects; + + public S3StreamMetadata(long currentEpoch, int currentRangeIndex, long startOffset, + StreamState currentState, SnapshotRegistry registry) { + this.currentEpoch = new TimelineLong(registry); + this.currentEpoch.set(currentEpoch); + this.currentRangeIndex = new TimelineInteger(registry); + this.currentRangeIndex.set(currentRangeIndex); + this.startOffset = new TimelineLong(registry); + this.startOffset.set(startOffset); + this.currentState = new TimelineObject(registry, currentState); + this.ranges = new TimelineHashMap<>(registry, 0); + this.streamObjects = new TimelineHashMap<>(registry, 0); + } + + public long currentEpoch() { + return currentEpoch.get(); + } + + public void currentEpoch(long epoch) { + this.currentEpoch.set(epoch); + } + + public int currentRangeIndex() { + return currentRangeIndex.get(); + } + + public void currentRangeIndex(int currentRangeIndex) { + this.currentRangeIndex.set(currentRangeIndex); + } + + public long startOffset() { + return startOffset.get(); + } + + public void startOffset(long offset) { + this.startOffset.set(offset); + } + + public StreamState currentState() { + return currentState.get(); + } + + public void currentState(StreamState state) { + this.currentState.set(state); + } + + public Map ranges() { + return ranges; + } + + public RangeMetadata currentRangeMetadata() { + return ranges.get(currentRangeIndex.get()); + } + + public Map streamObjects() { + return streamObjects; + } + + @Override + public String toString() { + return "S3StreamMetadata{" + + "currentEpoch=" + currentEpoch.get() + + ", currentState=" + currentState.get() + + ", currentRangeIndex=" + currentRangeIndex.get() + + ", startOffset=" + startOffset.get() + + ", ranges=" + ranges + + ", streamObjects=" + streamObjects + + '}'; + } +} diff --git a/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java index dd4ea38a32..74594867d3 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java @@ -19,8 +19,7 @@ import com.automq.stream.s3.metadata.S3StreamConstant; import com.automq.stream.s3.metadata.StreamOffsetRange; -import java.util.HashMap; -import java.util.stream.Stream; +import com.automq.stream.s3.metadata.StreamState; import org.apache.kafka.common.message.CloseStreamsRequestData.CloseStreamRequest; import org.apache.kafka.common.message.CloseStreamsResponseData.CloseStreamResponse; import org.apache.kafka.common.message.CommitStreamObjectRequestData; @@ -59,22 +58,21 @@ import org.apache.kafka.metadata.stream.RangeMetadata; import org.apache.kafka.metadata.stream.S3StreamObject; import org.apache.kafka.metadata.stream.S3StreamSetObject; -import com.automq.stream.s3.metadata.StreamState; import org.apache.kafka.server.common.ApiMessageAndVersion; import org.apache.kafka.timeline.SnapshotRegistry; import org.apache.kafka.timeline.TimelineHashMap; -import org.apache.kafka.timeline.TimelineInteger; import org.apache.kafka.timeline.TimelineLong; -import org.apache.kafka.timeline.TimelineObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.stream.Collectors; +import java.util.stream.Stream; import static com.automq.stream.s3.metadata.ObjectUtils.NOOP_OBJECT_ID; @@ -84,110 +82,6 @@ @SuppressWarnings("all") public class StreamControlManager { private static final Logger LOGGER = LoggerFactory.getLogger(StreamControlManager.class); - - public static class S3StreamMetadata { - - // current epoch, when created but not open, use -1 represent - private final TimelineLong currentEpoch; - // rangeIndex, when created but not open, there is no range, use -1 represent - private final TimelineInteger currentRangeIndex; - /** - * The visible start offset of stream, it may be larger than the start offset of current range. - */ - private final TimelineLong startOffset; - private final TimelineObject currentState; - private final TimelineHashMap ranges; - private final TimelineHashMap streamObjects; - - public S3StreamMetadata(long currentEpoch, int currentRangeIndex, long startOffset, - StreamState currentState, SnapshotRegistry registry) { - this.currentEpoch = new TimelineLong(registry); - this.currentEpoch.set(currentEpoch); - this.currentRangeIndex = new TimelineInteger(registry); - this.currentRangeIndex.set(currentRangeIndex); - this.startOffset = new TimelineLong(registry); - this.startOffset.set(startOffset); - this.currentState = new TimelineObject(registry, currentState); - this.ranges = new TimelineHashMap<>(registry, 0); - this.streamObjects = new TimelineHashMap<>(registry, 0); - } - - public long currentEpoch() { - return currentEpoch.get(); - } - - public int currentRangeIndex() { - return currentRangeIndex.get(); - } - - public long startOffset() { - return startOffset.get(); - } - - public StreamState currentState() { - return currentState.get(); - } - - public Map ranges() { - return ranges; - } - - public RangeMetadata currentRangeMetadata() { - return ranges.get(currentRangeIndex.get()); - } - - public Map streamObjects() { - return streamObjects; - } - - @Override - public String toString() { - return "S3StreamMetadata{" + - "currentEpoch=" + currentEpoch.get() + - ", currentState=" + currentState.get() + - ", currentRangeIndex=" + currentRangeIndex.get() + - ", startOffset=" + startOffset.get() + - ", ranges=" + ranges + - ", streamObjects=" + streamObjects + - '}'; - } - } - - public static class NodeS3StreamSetObjectMetadata { - - private final int nodeId; - private final TimelineLong nodeEpoch; - private final TimelineHashMap streamSetObjects; - - public NodeS3StreamSetObjectMetadata(int nodeId, long nodeEpoch, SnapshotRegistry registry) { - this.nodeId = nodeId; - this.nodeEpoch = new TimelineLong(registry); - this.nodeEpoch.set(nodeEpoch); - this.streamSetObjects = new TimelineHashMap<>(registry, 0); - } - - public int getNodeId() { - return nodeId; - } - - public long getNodeEpoch() { - return nodeEpoch.get(); - } - - public TimelineHashMap streamSetObjects() { - return streamSetObjects; - } - - @Override - public String toString() { - return "NodeS3StreamSetObjectMetadata{" + - "nodeId=" + nodeId + - ", nodeEpoch=" + nodeEpoch + - ", streamSetObjects=" + streamSetObjects + - '}'; - } - } - private final SnapshotRegistry snapshotRegistry; private final Logger log; @@ -201,7 +95,7 @@ public String toString() { private final TimelineHashMap streamsMetadata; - private final TimelineHashMap nodesMetadata; + private final TimelineHashMap nodesMetadata; public StreamControlManager( SnapshotRegistry snapshotRegistry, @@ -223,20 +117,20 @@ public ControllerResult createStream(int nodeId, long node if (nodeEpochCheckResult != Errors.NONE) { resp.setErrorCode(nodeEpochCheckResult.code()); log.warn("[CreateStream] nodeId={}'s epoch={} check failed, code: {}", - nodeId, nodeEpoch, nodeEpochCheckResult.code()); + nodeId, nodeEpoch, nodeEpochCheckResult.code()); return ControllerResult.of(Collections.emptyList(), resp); } // TODO: pre assigned a batch of stream id in controller long streamId = nextAssignedStreamId.get(); // update assigned id ApiMessageAndVersion record0 = new ApiMessageAndVersion(new AssignedStreamIdRecord() - .setAssignedStreamId(streamId), (short) 0); + .setAssignedStreamId(streamId), (short) 0); // create stream ApiMessageAndVersion record = new ApiMessageAndVersion(new S3StreamRecord() - .setStreamId(streamId) - .setEpoch(S3StreamConstant.INIT_EPOCH) - .setStartOffset(S3StreamConstant.INIT_START_OFFSET) - .setRangeIndex(S3StreamConstant.INIT_RANGE_INDEX), (short) 0); + .setStreamId(streamId) + .setEpoch(S3StreamConstant.INIT_EPOCH) + .setStartOffset(S3StreamConstant.INIT_START_OFFSET) + .setRangeIndex(S3StreamConstant.INIT_RANGE_INDEX), (short) 0); resp.setStreamId(streamId); log.info("[CreateStream] create streamId={} success", streamId); return ControllerResult.atomicOf(Arrays.asList(record0, record), resp); @@ -286,7 +180,7 @@ public ControllerResult openStream(int nodeId, long nodeEpoc if (nodeEpochCheckResult != Errors.NONE) { resp.setErrorCode(nodeEpochCheckResult.code()); log.warn("[OpenStream] nodeId={}'s epoch={} check failed, code: {}", - nodeId, nodeEpoch, nodeEpochCheckResult.code()); + nodeId, nodeEpoch, nodeEpochCheckResult.code()); return ControllerResult.of(Collections.emptyList(), resp); } @@ -298,26 +192,26 @@ public ControllerResult openStream(int nodeId, long nodeEpoc } // verify epoch match S3StreamMetadata streamMetadata = this.streamsMetadata.get(streamId); - if (streamMetadata.currentEpoch.get() > epoch) { + if (streamMetadata.currentEpoch() > epoch) { resp.setErrorCode(Errors.STREAM_FENCED.code()); log.warn("[OpenStream] streamId={}'s epoch={} is larger than request epoch {}", streamId, - streamMetadata.currentEpoch.get(), epoch); + streamMetadata.currentEpoch(), epoch); return ControllerResult.of(Collections.emptyList(), resp); } - if (streamMetadata.currentEpoch.get() == epoch) { + if (streamMetadata.currentEpoch() == epoch) { // node may use the same epoch to open -> close -> open stream. // verify node - RangeMetadata rangeMetadata = streamMetadata.ranges.get(streamMetadata.currentRangeIndex()); + RangeMetadata rangeMetadata = streamMetadata.ranges().get(streamMetadata.currentRangeIndex()); if (rangeMetadata == null) { // should not happen log.error("[OpenStream] streamId={}'s current range={} not exist when open stream with epoch={}", streamId, - streamMetadata.currentRangeIndex(), epoch); + streamMetadata.currentRangeIndex(), epoch); resp.setErrorCode(Errors.STREAM_INNER_ERROR.code()); return ControllerResult.of(Collections.emptyList(), resp); } if (rangeMetadata.nodeId() != nodeId) { log.warn("[OpenStream] streamId={}'s current range={}'s nodeId={} is not equal to request nodeId={}", - streamId, streamMetadata.currentRangeIndex(), rangeMetadata.nodeId(), nodeId); + streamId, streamMetadata.currentRangeIndex(), rangeMetadata.nodeId(), nodeId); resp.setErrorCode(Errors.STREAM_FENCED.code()); return ControllerResult.of(Collections.emptyList(), resp); } @@ -327,11 +221,11 @@ public ControllerResult openStream(int nodeId, long nodeEpoc List records = new ArrayList<>(); if (streamMetadata.currentState() == StreamState.CLOSED) { records.add(new ApiMessageAndVersion(new S3StreamRecord() - .setStreamId(streamId) - .setEpoch(epoch) - .setRangeIndex(streamMetadata.currentRangeIndex()) - .setStartOffset(streamMetadata.startOffset()) - .setStreamState(StreamState.OPENED.toByte()), (short) 0)); + .setStreamId(streamId) + .setEpoch(epoch) + .setRangeIndex(streamMetadata.currentRangeIndex()) + .setStartOffset(streamMetadata.startOffset()) + .setStreamState(StreamState.OPENED.toByte()), (short) 0)); } return ControllerResult.of(records, resp); } @@ -346,27 +240,27 @@ public ControllerResult openStream(int nodeId, long nodeEpoc int newRangeIndex = streamMetadata.currentRangeIndex() + 1; // stream update record records.add(new ApiMessageAndVersion(new S3StreamRecord() - .setStreamId(streamId) - .setEpoch(epoch) - .setRangeIndex(newRangeIndex) - .setStartOffset(streamMetadata.startOffset()) - .setStreamState(StreamState.OPENED.toByte()), (short) 0)); + .setStreamId(streamId) + .setEpoch(epoch) + .setRangeIndex(newRangeIndex) + .setStartOffset(streamMetadata.startOffset()) + .setStreamState(StreamState.OPENED.toByte()), (short) 0)); // get new range's start offset // default regard this range is the first range in stream, use 0 as start offset long startOffset = 0; if (newRangeIndex > 0) { // means that the new range is not the first range in stream, get the last range's end offset - RangeMetadata lastRangeMetadata = streamMetadata.ranges.get(streamMetadata.currentRangeIndex.get()); + RangeMetadata lastRangeMetadata = streamMetadata.ranges().get(streamMetadata.currentRangeIndex()); startOffset = lastRangeMetadata.endOffset(); } // range create record records.add(new ApiMessageAndVersion(new RangeRecord() - .setStreamId(streamId) - .setNodeId(nodeId) - .setStartOffset(startOffset) - .setEndOffset(startOffset) - .setEpoch(epoch) - .setRangeIndex(newRangeIndex), (short) 0)); + .setStreamId(streamId) + .setNodeId(nodeId) + .setStartOffset(startOffset) + .setEndOffset(startOffset) + .setEpoch(epoch) + .setRangeIndex(newRangeIndex), (short) 0)); resp.setStartOffset(streamMetadata.startOffset()); resp.setNextOffset(startOffset); log.info("[OpenStream] nodeId={} open streamId={} with epoch={} success", nodeId, streamId, epoch); @@ -406,11 +300,11 @@ public ControllerResult closeStream(int nodeId, long nodeEp long epoch = request.streamEpoch(); // verify node epoch - Errors nodeEpochCheckResult = nodeEpochCheck(nodeId, nodeEpoch); + Errors nodeEpochCheckResult = nodeEpochCheck(nodeId, nodeEpoch, false); if (nodeEpochCheckResult != Errors.NONE) { resp.setErrorCode(nodeEpochCheckResult.code()); log.warn("[CloseStream] nodeId={}'s epoch={} check failed, code: {}", - nodeId, nodeEpoch, nodeEpochCheckResult.code()); + nodeId, nodeEpoch, nodeEpochCheckResult.code()); return ControllerResult.of(Collections.emptyList(), resp); } @@ -428,12 +322,12 @@ public ControllerResult closeStream(int nodeId, long nodeEp // now the request in valid, update the stream's state // stream update record List records = List.of( - new ApiMessageAndVersion(new S3StreamRecord() - .setStreamId(streamId) - .setEpoch(epoch) - .setRangeIndex(streamMetadata.currentRangeIndex()) - .setStartOffset(streamMetadata.startOffset()) - .setStreamState(StreamState.CLOSED.toByte()), (short) 0)); + new ApiMessageAndVersion(new S3StreamRecord() + .setStreamId(streamId) + .setEpoch(epoch) + .setRangeIndex(streamMetadata.currentRangeIndex()) + .setStartOffset(streamMetadata.startOffset()) + .setStreamState(StreamState.CLOSED.toByte()), (short) 0)); log.info("[CloseStream] nodeId={} close streamId={} with epochId={} success", nodeId, streamId, epoch); return ControllerResult.atomicOf(records, resp); } @@ -450,7 +344,7 @@ public ControllerResult trimStream(int nodeId, long nodeEpoc if (nodeEpochCheckResult != Errors.NONE) { resp.setErrorCode(nodeEpochCheckResult.code()); log.warn("[TrimStream] nodeId={}'s epoch={} check failed, code: {}", - nodeId, nodeEpoch, nodeEpochCheckResult.code()); + nodeId, nodeEpoch, nodeEpochCheckResult.code()); return ControllerResult.of(Collections.emptyList(), resp); } @@ -468,7 +362,7 @@ public ControllerResult trimStream(int nodeId, long nodeEpoc } if (streamMetadata.startOffset() > newStartOffset) { log.warn("[TrimStream] streamId={}'s start offset {} is larger than request new start offset {}", - streamId, streamMetadata.startOffset(), newStartOffset); + streamId, streamMetadata.startOffset(), newStartOffset); resp.setErrorCode(Errors.OFFSET_NOT_MATCHED.code()); return ControllerResult.of(Collections.emptyList(), resp); } @@ -480,13 +374,13 @@ public ControllerResult trimStream(int nodeId, long nodeEpoc // update the stream metadata start offset List records = new ArrayList<>(); records.add(new ApiMessageAndVersion(new S3StreamRecord() - .setStreamId(streamId) - .setEpoch(epoch) - .setRangeIndex(streamMetadata.currentRangeIndex()) - .setStartOffset(newStartOffset) - .setStreamState(streamMetadata.currentState().toByte()), (short) 0)); + .setStreamId(streamId) + .setEpoch(epoch) + .setRangeIndex(streamMetadata.currentRangeIndex()) + .setStartOffset(newStartOffset) + .setStreamState(streamMetadata.currentState().toByte()), (short) 0)); // remove range or update range's start offset - streamMetadata.ranges.entrySet().stream().forEach(it -> { + streamMetadata.ranges().entrySet().stream().forEach(it -> { Integer rangeIndex = it.getKey(); RangeMetadata range = it.getValue(); if (newStartOffset <= range.startOffset()) { @@ -500,32 +394,32 @@ public ControllerResult trimStream(int nodeId, long nodeEpoc // 3. try to trim to 110, then current range will be [100, 100) long newRangeStartOffset = newStartOffset < range.endOffset() ? newStartOffset : range.endOffset(); records.add(new ApiMessageAndVersion(new RangeRecord() - .setStreamId(streamId) - .setRangeIndex(rangeIndex) - .setNodeId(range.nodeId()) - .setEpoch(range.epoch()) - .setStartOffset(newRangeStartOffset) - .setEndOffset(range.endOffset()), (short) 0)); + .setStreamId(streamId) + .setRangeIndex(rangeIndex) + .setNodeId(range.nodeId()) + .setEpoch(range.epoch()) + .setStartOffset(newRangeStartOffset) + .setEndOffset(range.endOffset()), (short) 0)); return; } if (newStartOffset >= range.endOffset()) { // remove range records.add(new ApiMessageAndVersion(new RemoveRangeRecord() - .setStreamId(streamId) - .setRangeIndex(rangeIndex), (short) 0)); + .setStreamId(streamId) + .setRangeIndex(rangeIndex), (short) 0)); return; } // update range's start offset records.add(new ApiMessageAndVersion(new RangeRecord() - .setStreamId(streamId) - .setNodeId(range.nodeId()) - .setStartOffset(newStartOffset) - .setEndOffset(range.endOffset()) - .setEpoch(range.epoch()) - .setRangeIndex(rangeIndex), (short) 0)); + .setStreamId(streamId) + .setNodeId(range.nodeId()) + .setStartOffset(newStartOffset) + .setEndOffset(range.endOffset()) + .setEpoch(range.epoch()) + .setRangeIndex(rangeIndex), (short) 0)); }); // remove stream object - streamMetadata.streamObjects.entrySet().stream().forEach(it -> { + streamMetadata.streamObjects().entrySet().stream().forEach(it -> { Long objectId = it.getKey(); S3StreamObject streamObject = it.getValue(); long streamStartOffset = streamObject.streamOffsetRange().getStartOffset(); @@ -536,10 +430,10 @@ public ControllerResult trimStream(int nodeId, long nodeEpoc if (newStartOffset >= streamEndOffset) { // remove stream object records.add(new ApiMessageAndVersion(new RemoveS3StreamObjectRecord() - .setStreamId(streamId) - .setObjectId(objectId), (short) 0)); + .setStreamId(streamId) + .setObjectId(objectId), (short) 0)); ControllerResult markDestroyResult = this.s3ObjectControlManager.markDestroyObjects( - Collections.singletonList(objectId)); + Collections.singletonList(objectId)); if (!markDestroyResult.response()) { log.error("[TrimStream] Mark destroy stream object: {} failed", objectId); resp.setErrorCode(Errors.STREAM_INNER_ERROR.code()); @@ -554,38 +448,38 @@ public ControllerResult trimStream(int nodeId, long nodeEpoc // remove stream set object or remove stream range in stream set object // TODO: optimize this.nodesMetadata.values() - .stream() - .flatMap(entry -> entry.streamSetObjects.values().stream()) - .filter(streamSetObject -> streamSetObject.offsetRanges().containsKey(streamId)) - .filter(streamSetObject -> streamSetObject.offsetRanges().get(streamId).getEndOffset() <= newStartOffset) - .forEach(streamSetObj -> { - if (streamSetObj.offsetRanges().size() == 1) { - // only this range, but we will remove this range, so now we can remove this stream set object - records.add(new ApiMessageAndVersion( - new RemoveStreamSetObjectRecord() - .setNodeId(streamSetObj.nodeId()) - .setObjectId(streamSetObj.objectId()), (short) 0 - )); - ControllerResult markDestroyResult = this.s3ObjectControlManager.markDestroyObjects( - List.of(streamSetObj.objectId())); - if (!markDestroyResult.response()) { - log.error("[TrimStream] Mark destroy stream set object: {} failed", streamSetObj.objectId()); - resp.setErrorCode(Errors.STREAM_INNER_ERROR.code()); + .stream() + .flatMap(entry -> entry.streamSetObjects().values().stream()) + .filter(streamSetObject -> streamSetObject.offsetRanges().containsKey(streamId)) + .filter(streamSetObject -> streamSetObject.offsetRanges().get(streamId).getEndOffset() <= newStartOffset) + .forEach(streamSetObj -> { + if (streamSetObj.offsetRanges().size() == 1) { + // only this range, but we will remove this range, so now we can remove this stream set object + records.add(new ApiMessageAndVersion( + new RemoveStreamSetObjectRecord() + .setNodeId(streamSetObj.nodeId()) + .setObjectId(streamSetObj.objectId()), (short) 0 + )); + ControllerResult markDestroyResult = this.s3ObjectControlManager.markDestroyObjects( + List.of(streamSetObj.objectId())); + if (!markDestroyResult.response()) { + log.error("[TrimStream] Mark destroy stream set object: {} failed", streamSetObj.objectId()); + resp.setErrorCode(Errors.STREAM_INNER_ERROR.code()); + return; + } + records.addAll(markDestroyResult.records()); return; } - records.addAll(markDestroyResult.records()); - return; - } - Map newOffsetRange = new HashMap<>(streamSetObj.offsetRanges()); - // remove offset range - newOffsetRange.remove(streamId); - records.add(new ApiMessageAndVersion(new S3StreamSetObjectRecord() - .setObjectId(streamSetObj.objectId()) - .setNodeId(streamSetObj.nodeId()) - .setStreamsIndex(newOffsetRange.values().stream().map(Convertor::to).collect(Collectors.toList())) - .setDataTimeInMs(streamSetObj.dataTimeInMs()) - .setOrderId(streamSetObj.orderId()), (short) 0)); - }); + Map newOffsetRange = new HashMap<>(streamSetObj.offsetRanges()); + // remove offset range + newOffsetRange.remove(streamId); + records.add(new ApiMessageAndVersion(new S3StreamSetObjectRecord() + .setObjectId(streamSetObj.objectId()) + .setNodeId(streamSetObj.nodeId()) + .setStreamsIndex(newOffsetRange.values().stream().map(Convertor::to).collect(Collectors.toList())) + .setDataTimeInMs(streamSetObj.dataTimeInMs()) + .setOrderId(streamSetObj.orderId()), (short) 0)); + }); if (resp.errorCode() != Errors.NONE.code()) { return ControllerResult.of(Collections.emptyList(), resp); } @@ -603,7 +497,7 @@ public ControllerResult deleteStream(int nodeId, long node if (nodeEpochCheckResult != Errors.NONE) { resp.setErrorCode(nodeEpochCheckResult.code()); log.warn("[DeleteStream] nodeId={}'s epoch={} check failed, code: {}", - nodeId, nodeEpoch, nodeEpochCheckResult.code()); + nodeId, nodeEpoch, nodeEpochCheckResult.code()); return ControllerResult.of(Collections.emptyList(), resp); } @@ -619,9 +513,9 @@ public ControllerResult deleteStream(int nodeId, long node // generate remove stream record List records = new ArrayList<>(); records.add(new ApiMessageAndVersion(new RemoveS3StreamRecord() - .setStreamId(streamId), (short) 0)); + .setStreamId(streamId), (short) 0)); // generate stream objects destroy records - List streamObjectIds = streamMetadata.streamObjects.keySet().stream().collect(Collectors.toList()); + List streamObjectIds = streamMetadata.streamObjects().keySet().stream().collect(Collectors.toList()); ControllerResult markDestroyResult = this.s3ObjectControlManager.markDestroyObjects(streamObjectIds); if (!markDestroyResult.response()) { log.error("[DeleteStream]: Mark destroy stream objects: {} failed", streamObjectIds); @@ -631,37 +525,37 @@ public ControllerResult deleteStream(int nodeId, long node records.addAll(markDestroyResult.records()); // remove stream set object or remove stream-offset-range in stream set object this.nodesMetadata.values() - .stream() - .flatMap(entry -> entry.streamSetObjects.values().stream()) - .filter(streamsSetObject -> streamsSetObject.offsetRanges().containsKey(streamId)) - .forEach(streamSetObj -> { - if (streamSetObj.offsetRanges().size() == 1) { - // only this range, but we will remove this range, so now we can remove this stream set object - records.add(new ApiMessageAndVersion( - new RemoveStreamSetObjectRecord() - .setNodeId(streamSetObj.nodeId()) - .setObjectId(streamSetObj.objectId()), (short) 0 - )); - ControllerResult result = this.s3ObjectControlManager.markDestroyObjects( - List.of(streamSetObj.objectId())); - if (!result.response()) { - log.error("[DeleteStream]: Mark destroy stream set object: {} failed", streamSetObj.objectId()); - resp.setErrorCode(Errors.STREAM_INNER_ERROR.code()); + .stream() + .flatMap(entry -> entry.streamSetObjects().values().stream()) + .filter(streamsSetObject -> streamsSetObject.offsetRanges().containsKey(streamId)) + .forEach(streamSetObj -> { + if (streamSetObj.offsetRanges().size() == 1) { + // only this range, but we will remove this range, so now we can remove this stream set object + records.add(new ApiMessageAndVersion( + new RemoveStreamSetObjectRecord() + .setNodeId(streamSetObj.nodeId()) + .setObjectId(streamSetObj.objectId()), (short) 0 + )); + ControllerResult result = this.s3ObjectControlManager.markDestroyObjects( + List.of(streamSetObj.objectId())); + if (!result.response()) { + log.error("[DeleteStream]: Mark destroy stream set object: {} failed", streamSetObj.objectId()); + resp.setErrorCode(Errors.STREAM_INNER_ERROR.code()); + return; + } + records.addAll(result.records()); return; } - records.addAll(result.records()); - return; - } - Map newOffsetRange = new HashMap<>(streamSetObj.offsetRanges()); - // remove offset range - newOffsetRange.remove(streamId); - records.add(new ApiMessageAndVersion(new S3StreamSetObjectRecord() - .setObjectId(streamSetObj.objectId()) - .setNodeId(streamSetObj.nodeId()) - .setStreamsIndex(newOffsetRange.values().stream().map(Convertor::to).collect(Collectors.toList())) - .setDataTimeInMs(streamSetObj.dataTimeInMs()) - .setOrderId(streamSetObj.orderId()), (short) 0)); - }); + Map newOffsetRange = new HashMap<>(streamSetObj.offsetRanges()); + // remove offset range + newOffsetRange.remove(streamId); + records.add(new ApiMessageAndVersion(new S3StreamSetObjectRecord() + .setObjectId(streamSetObj.objectId()) + .setNodeId(streamSetObj.nodeId()) + .setStreamsIndex(newOffsetRange.values().stream().map(Convertor::to).collect(Collectors.toList())) + .setDataTimeInMs(streamSetObj.dataTimeInMs()) + .setOrderId(streamSetObj.orderId()), (short) 0)); + }); if (resp.errorCode() != Errors.NONE.code()) { return ControllerResult.of(Collections.emptyList(), resp); } @@ -699,11 +593,11 @@ public ControllerResult commitStreamSetObject long orderId = data.orderId(); // verify node epoch - Errors nodeEpochCheckResult = nodeEpochCheck(nodeId, nodeEpoch); + Errors nodeEpochCheckResult = nodeEpochCheck(nodeId, nodeEpoch, !data.failoverMode()); if (nodeEpochCheckResult != Errors.NONE) { resp.setErrorCode(nodeEpochCheckResult.code()); log.warn("[CommitStreamSetObject] nodeId={}'s epoch={} check failed, code: {}", - nodeId, nodeEpoch, nodeEpochCheckResult.code()); + nodeId, nodeEpoch, nodeEpochCheckResult.code()); return ControllerResult.of(Collections.emptyList(), resp); } @@ -715,13 +609,13 @@ public ControllerResult commitStreamSetObject if (compactedObjectIds == null || compactedObjectIds.isEmpty()) { // verify stream continuity List offsetRanges = Stream.concat( - streamRanges - .stream() - .map(range -> new StreamOffsetRange(range.streamId(), range.startOffset(), range.endOffset())), - streamObjects - .stream() - .map(obj -> new StreamOffsetRange(obj.streamId(), obj.startOffset(), obj.endOffset()))) - .collect(Collectors.toList()); + streamRanges + .stream() + .map(range -> new StreamOffsetRange(range.streamId(), range.startOffset(), range.endOffset())), + streamObjects + .stream() + .map(obj -> new StreamOffsetRange(obj.streamId(), obj.startOffset(), obj.endOffset()))) + .collect(Collectors.toList()); Errors continuityCheckResult = streamAdvanceCheck(offsetRanges, data.nodeId()); if (continuityCheckResult != Errors.NONE) { log.error("[CommitStreamSetObject] streamId={} advance check failed, error: {}", offsetRanges, continuityCheckResult); @@ -756,20 +650,13 @@ public ControllerResult commitStreamSetObject // update dataTs to the min compacted object's dataTs //noinspection OptionalGetWithoutIsPresent dataTs = compactedObjectIds.stream() - .map(id -> this.nodesMetadata.get(nodeId).streamSetObjects.get(id)) + .map(id -> this.nodesMetadata.get(nodeId).streamSetObjects().get(id)) .map(S3StreamSetObject::dataTimeInMs) .min(Long::compareTo).get(); } List indexes = streamRanges.stream() .map(range -> new StreamOffsetRange(range.streamId(), range.startOffset(), range.endOffset())) .collect(Collectors.toList()); - // update node's stream set object - NodeS3StreamSetObjectMetadata nodeMetadata = this.nodesMetadata.get(nodeId); - if (nodeMetadata == null) { - // first time commit stream set object, generate node's metadata record - records.add(new ApiMessageAndVersion(new NodeWALMetadataRecord() - .setNodeId(nodeId), (short) 0)); - } if (objectId != NOOP_OBJECT_ID) { // generate node's stream set object record List streamIndexes = indexes.stream() @@ -851,7 +738,7 @@ public ControllerResult commitStreamObject(Commi if (nodeEpochCheckResult != Errors.NONE) { resp.setErrorCode(nodeEpochCheckResult.code()); log.warn("[CommitStreamObject]: nodeId={}'s epoch={} check failed, code: {}", - nodeId, nodeEpoch, nodeEpochCheckResult.code()); + nodeId, nodeEpoch, nodeEpochCheckResult.code()); return ControllerResult.of(Collections.emptyList(), resp); } @@ -882,7 +769,7 @@ public ControllerResult commitStreamObject(Commi // update dataTs to the min compacted object's dataTs //noinspection OptionalGetWithoutIsPresent dataTs = sourceObjectIds.stream() - .map(id -> this.streamsMetadata.get(streamId).streamObjects.get(id)) + .map(id -> this.streamsMetadata.get(streamId).streamObjects().get(id)) .map(S3StreamObject::dataTimeInMs) .min(Long::compareTo).get(); } @@ -909,39 +796,53 @@ public ControllerResult getOpeningStreams(GetOpen GetOpeningStreamsResponseData resp = new GetOpeningStreamsResponseData(); int nodeId = data.nodeId(); long nodeEpoch = data.nodeEpoch(); + boolean failoverMode = data.failoverMode(); + + List records = new ArrayList<>(); + + NodeMetadata nodeMetadata = this.nodesMetadata.get(nodeId); + if (nodeMetadata == null) { + // create a new node metadata if absent + records.add(new ApiMessageAndVersion( + new NodeWALMetadataRecord().setNodeId(nodeId).setNodeEpoch(nodeEpoch).setFailoverMode(failoverMode), + (short) 0)); + } // verify and update node epoch - if (nodesMetadata.containsKey(nodeId) && nodeEpoch < nodesMetadata.get(nodeId).getNodeEpoch()) { + if (nodeMetadata != null && nodeEpoch < nodeMetadata.getNodeEpoch()) { // node epoch has been expired resp.setErrorCode(Errors.NODE_EPOCH_EXPIRED.code()); log.warn("[GetOpeningStreams]: nodeId={}'s epoch={} has been expired", nodeId, nodeEpoch); return ControllerResult.of(Collections.emptyList(), resp); } - List records = new ArrayList<>(); - // update node epoch - records.add(new ApiMessageAndVersion(new NodeWALMetadataRecord() - .setNodeId(nodeId) - .setNodeEpoch(nodeEpoch), (short) 0)); + + if (nodeMetadata != null) { + // update node epoch + records.add(new ApiMessageAndVersion( + new NodeWALMetadataRecord().setNodeId(nodeId).setNodeEpoch(nodeEpoch).setFailoverMode(failoverMode), + (short) 0)); + } + // The getOpeningStreams is invoked when node startup, so we just iterate all streams to get the node opening streams. List streamStatusList = this.streamsMetadata.entrySet().stream().filter(entry -> { S3StreamMetadata streamMetadata = entry.getValue(); - if (!StreamState.OPENED.equals(streamMetadata.currentState.get())) { + if (!StreamState.OPENED.equals(streamMetadata.currentState())) { return false; } - int rangeIndex = streamMetadata.currentRangeIndex.get(); + int rangeIndex = streamMetadata.currentRangeIndex(); if (rangeIndex < 0) { return false; } - RangeMetadata rangeMetadata = streamMetadata.ranges.get(rangeIndex); + RangeMetadata rangeMetadata = streamMetadata.ranges().get(rangeIndex); return rangeMetadata.nodeId() == nodeId; }).map(e -> { S3StreamMetadata streamMetadata = e.getValue(); - RangeMetadata rangeMetadata = streamMetadata.ranges.get(streamMetadata.currentRangeIndex.get()); + RangeMetadata rangeMetadata = streamMetadata.ranges().get(streamMetadata.currentRangeIndex()); return new StreamMetadata() .setStreamId(e.getKey()) - .setEpoch(streamMetadata.currentEpoch.get()) - .setStartOffset(streamMetadata.startOffset.get()) + .setEpoch(streamMetadata.currentEpoch()) + .setStartOffset(streamMetadata.startOffset()) .setEndOffset(rangeMetadata.endOffset()); }).collect(Collectors.toList()); resp.setStreamMetadataList(streamStatusList); @@ -962,26 +863,26 @@ private Errors streamOwnershipCheck(long streamId, long epoch, int nodeId, Strin S3StreamMetadata streamMetadata = this.streamsMetadata.get(streamId); if (streamMetadata.currentEpoch() > epoch) { log.warn("[{}]: streamId={}'s epoch={} is larger than request epoch={}", operationName, streamId, - streamMetadata.currentEpoch.get(), epoch); + streamMetadata.currentEpoch(), epoch); return Errors.STREAM_FENCED; } if (streamMetadata.currentEpoch() < epoch) { // should not happen log.error("[{}]: streamId={}'s epoch={} is smaller than request epoch={}", operationName, streamId, - streamMetadata.currentEpoch.get(), epoch); + streamMetadata.currentEpoch(), epoch); return Errors.STREAM_INNER_ERROR; } // verify node - RangeMetadata rangeMetadata = streamMetadata.ranges.get(streamMetadata.currentRangeIndex()); + RangeMetadata rangeMetadata = streamMetadata.ranges().get(streamMetadata.currentRangeIndex()); if (rangeMetadata == null) { // should not happen log.error("[{}]: streamId={}'s current range={} not exist when trim stream with epoch={}", operationName, streamId, - streamMetadata.currentRangeIndex(), epoch); + streamMetadata.currentRangeIndex(), epoch); return Errors.STREAM_INNER_ERROR; } if (rangeMetadata.nodeId() != nodeId) { log.warn("[{}]: streamId={}'s current range={}'s nodeId={} is not equal to request nodeId={}", operationName, - streamId, streamMetadata.currentRangeIndex(), rangeMetadata.nodeId(), nodeId); + streamId, streamMetadata.currentRangeIndex(), rangeMetadata.nodeId(), nodeId); return Errors.STREAM_FENCED; } return Errors.NONE; @@ -1022,7 +923,7 @@ private Errors streamAdvanceCheck(List ranges, int nodeId) { if (rangeMetadata == null) { // should not happen log.error("[streamAdvanceCheck]: streamId={}'s current range={} not exist when stream has been ", - range.getStreamId(), this.streamsMetadata.get(range.getStreamId()).currentRangeIndex()); + range.getStreamId(), this.streamsMetadata.get(range.getStreamId()).currentRangeIndex()); return Errors.STREAM_INNER_ERROR; } else if (rangeMetadata.nodeId() != nodeId) { // should not happen @@ -1032,28 +933,37 @@ private Errors streamAdvanceCheck(List ranges, int nodeId) { } if (rangeMetadata.endOffset() != range.getStartOffset()) { log.warn("[streamAdvanceCheck]: streamId={}'s current range={}'s end offset {} is not equal to request start offset {}", - range.getStreamId(), this.streamsMetadata.get(range.getStreamId()).currentRangeIndex(), - rangeMetadata.endOffset(), range.getStartOffset()); + range.getStreamId(), this.streamsMetadata.get(range.getStreamId()).currentRangeIndex(), + rangeMetadata.endOffset(), range.getStartOffset()); return Errors.OFFSET_NOT_MATCHED; } } return Errors.NONE; } + private Errors nodeEpochCheck(int nodeId, long nodeEpoch) { + return nodeEpochCheck(nodeId, nodeEpoch, true); + } + /** * Check whether this node is valid to operate the stream related resources. */ - private Errors nodeEpochCheck(int nodeId, long nodeEpoch) { - if (!this.nodesMetadata.containsKey(nodeId)) { + private Errors nodeEpochCheck(int nodeId, long nodeEpoch, boolean checkFailover) { + NodeMetadata nodeMetadata = this.nodesMetadata.get(nodeId); + if (nodeMetadata == null) { // should not happen log.error("[NodeEpochCheck]: nodeId={} not exist when check node epoch", nodeId); return Errors.NODE_EPOCH_NOT_EXIST; } - if (this.nodesMetadata.get(nodeId).getNodeEpoch() > nodeEpoch) { + if (nodeMetadata.getNodeEpoch() > nodeEpoch) { log.warn("[NodeEpochCheck]: nodeId={}'s epoch={} is larger than request epoch={}", nodeId, - this.nodesMetadata.get(nodeId).getNodeEpoch(), nodeEpoch); + this.nodesMetadata.get(nodeId).getNodeEpoch(), nodeEpoch); return Errors.NODE_EPOCH_EXPIRED; } + if (checkFailover && nodeMetadata.getFailoverMode()) { + log.warn("[NodeEpochCheck]: nodeId={} epoch={} is fenced", nodeId, nodeEpoch); + return Errors.NODE_FENCED; + } return Errors.NONE; } @@ -1066,10 +976,10 @@ public void replay(S3StreamRecord record) { // already exist, update the stream's self metadata if (this.streamsMetadata.containsKey(streamId)) { S3StreamMetadata streamMetadata = this.streamsMetadata.get(streamId); - streamMetadata.startOffset.set(record.startOffset()); - streamMetadata.currentEpoch.set(record.epoch()); - streamMetadata.currentRangeIndex.set(record.rangeIndex()); - streamMetadata.currentState.set(StreamState.fromByte(record.streamState())); + streamMetadata.startOffset(record.startOffset()); + streamMetadata.currentEpoch(record.epoch()); + streamMetadata.currentRangeIndex(record.rangeIndex()); + streamMetadata.currentState(StreamState.fromByte(record.streamState())); return; } // not exist, create a new stream @@ -1091,7 +1001,7 @@ public void replay(RangeRecord record) { log.error("streamId={} not exist when replay range record {}", streamId, record); return; } - streamMetadata.ranges.put(record.rangeIndex(), RangeMetadata.of(record)); + streamMetadata.ranges().put(record.rangeIndex(), RangeMetadata.of(record)); } public void replay(RemoveRangeRecord record) { @@ -1102,7 +1012,7 @@ public void replay(RemoveRangeRecord record) { log.error("streamId={} not exist when replay remove range record {}", streamId, record); return; } - streamMetadata.ranges.remove(record.rangeIndex()); + streamMetadata.ranges().remove(record.rangeIndex()); } public void replay(NodeWALMetadataRecord record) { @@ -1110,12 +1020,13 @@ public void replay(NodeWALMetadataRecord record) { long nodeEpoch = record.nodeEpoch(); // already exist, update the node's self metadata if (this.nodesMetadata.containsKey(nodeId)) { - NodeS3StreamSetObjectMetadata nodeMetadata = this.nodesMetadata.get(nodeId); - nodeMetadata.nodeEpoch.set(nodeEpoch); + NodeMetadata nodeMetadata = this.nodesMetadata.get(nodeId); + nodeMetadata.setNodeEpoch(nodeEpoch); + nodeMetadata.setFailoverMode(record.failoverMode()); return; } // not exist, create a new node - this.nodesMetadata.put(nodeId, new NodeS3StreamSetObjectMetadata(nodeId, nodeEpoch, this.snapshotRegistry)); + this.nodesMetadata.put(nodeId, new NodeMetadata(nodeId, nodeEpoch, record.failoverMode(), this.snapshotRegistry)); } public void replay(S3StreamSetObjectRecord record) { @@ -1124,7 +1035,7 @@ public void replay(S3StreamSetObjectRecord record) { long orderId = record.orderId(); long dataTs = record.dataTimeInMs(); List streamIndexes = record.streamsIndex(); - NodeS3StreamSetObjectMetadata nodeMetadata = this.nodesMetadata.get(nodeId); + NodeMetadata nodeMetadata = this.nodesMetadata.get(nodeId); if (nodeMetadata == null) { // should not happen log.error("nodeId={} not exist when replay stream set object record {}", nodeId, record); @@ -1135,7 +1046,7 @@ public void replay(S3StreamSetObjectRecord record) { Map indexMap = streamIndexes .stream() .collect(Collectors.toMap(StreamIndex::streamId, Convertor::to)); - nodeMetadata.streamSetObjects.put(objectId, new S3StreamSetObject(objectId, nodeId, indexMap, orderId, dataTs)); + nodeMetadata.streamSetObjects().put(objectId, new S3StreamSetObject(objectId, nodeId, indexMap, orderId, dataTs)); // update range record.streamsIndex().forEach(index -> { @@ -1166,13 +1077,13 @@ public void replay(S3StreamSetObjectRecord record) { public void replay(RemoveStreamSetObjectRecord record) { long objectId = record.objectId(); - NodeS3StreamSetObjectMetadata walMetadata = this.nodesMetadata.get(record.nodeId()); + NodeMetadata walMetadata = this.nodesMetadata.get(record.nodeId()); if (walMetadata == null) { // should not happen log.error("node {} not exist when replay remove stream set object record {}", record.nodeId(), record); return; } - walMetadata.streamSetObjects.remove(objectId); + walMetadata.streamSetObjects().remove(objectId); } public void replay(S3StreamObjectRecord record) { @@ -1188,7 +1099,7 @@ public void replay(S3StreamObjectRecord record) { log.error("streamId={} not exist when replay stream object record {}", streamId, record); return; } - streamMetadata.streamObjects.put(objectId, new S3StreamObject(objectId, streamId, startOffset, endOffset, dataTs)); + streamMetadata.streamObjects().put(objectId, new S3StreamObject(objectId, streamId, startOffset, endOffset, dataTs)); // update range RangeMetadata rangeMetadata = streamMetadata.currentRangeMetadata(); if (rangeMetadata == null) { @@ -1215,7 +1126,7 @@ public void replay(RemoveS3StreamObjectRecord record) { log.error("streamId={} not exist when replay remove stream object record {}", streamId, record); return; } - streamMetadata.streamObjects.remove(objectId); + streamMetadata.streamObjects().remove(objectId); } public void replay(RemoveNodeWALMetadataRecord record) { @@ -1228,7 +1139,7 @@ public Map streamsMetadata() { return streamsMetadata; } - public Map nodesMetadata() { + public Map nodesMetadata() { return nodesMetadata; } @@ -1245,4 +1156,5 @@ public String toString() { ", nodesMetadata=" + nodesMetadata + '}'; } + } diff --git a/metadata/src/main/java/org/apache/kafka/image/FailoverContextDelta.java b/metadata/src/main/java/org/apache/kafka/image/FailoverContextDelta.java new file mode 100644 index 0000000000..d4adf809c7 --- /dev/null +++ b/metadata/src/main/java/org/apache/kafka/image/FailoverContextDelta.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.image; + +import org.apache.kafka.common.metadata.FailoverContextRecord; +import org.apache.kafka.metadata.stream.FailoverStatus; + +import java.util.HashMap; +import java.util.Map; + +public class FailoverContextDelta { + private final FailoverContextImage image; + private final Map changed = new HashMap<>(); + + public FailoverContextDelta(FailoverContextImage image) { + this.image = image; + } + + public void replay(FailoverContextRecord record) { + changed.put(record.failedNodeId(), record); + } + + public FailoverContextImage apply() { + Map newContexts = new HashMap<>(image.contexts()); + for (FailoverContextRecord record : changed.values()) { + if (record.status().equals(FailoverStatus.DONE.name())) { + newContexts.remove(record.failedNodeId()); + } else { + newContexts.put(record.failedNodeId(), record); + } + } + return new FailoverContextImage(newContexts); + } + + @Override + public String toString() { + return "FailoverContextDelta(" + " changed=" + changed + ')'; + } +} diff --git a/metadata/src/main/java/org/apache/kafka/image/FailoverContextImage.java b/metadata/src/main/java/org/apache/kafka/image/FailoverContextImage.java new file mode 100644 index 0000000000..f253119b7c --- /dev/null +++ b/metadata/src/main/java/org/apache/kafka/image/FailoverContextImage.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.image; + +import org.apache.kafka.common.metadata.FailoverContextRecord; +import org.apache.kafka.image.writer.ImageWriter; +import org.apache.kafka.image.writer.ImageWriterOptions; + +import java.util.Collections; +import java.util.Map; +import java.util.Objects; + +public class FailoverContextImage { + public static final FailoverContextImage EMPTY = new FailoverContextImage(Collections.emptyMap()); + + private final Map contexts; + + public FailoverContextImage(final Map contexts) { + this.contexts = contexts; + } + + public Map contexts() { + return contexts; + } + + public void write(ImageWriter writer, ImageWriterOptions options) { + contexts.values().forEach(r -> writer.write(0, r)); + } + + public boolean isEmpty() { + return contexts.isEmpty(); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + FailoverContextImage that = (FailoverContextImage) o; + return Objects.equals(contexts, that.contexts); + } + + @Override + public int hashCode() { + return Objects.hash(contexts); + } +} diff --git a/metadata/src/main/java/org/apache/kafka/image/MetadataDelta.java b/metadata/src/main/java/org/apache/kafka/image/MetadataDelta.java index b3c06fb7d0..0231d3da1f 100644 --- a/metadata/src/main/java/org/apache/kafka/image/MetadataDelta.java +++ b/metadata/src/main/java/org/apache/kafka/image/MetadataDelta.java @@ -21,6 +21,7 @@ import org.apache.kafka.common.metadata.AssignedS3ObjectIdRecord; import org.apache.kafka.common.metadata.AssignedStreamIdRecord; import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord; +import org.apache.kafka.common.metadata.FailoverContextRecord; import org.apache.kafka.common.metadata.NodeWALMetadataRecord; import org.apache.kafka.common.metadata.ClientQuotaRecord; import org.apache.kafka.common.metadata.ConfigRecord; @@ -98,6 +99,8 @@ public MetadataDelta build() { private KVDelta kvDelta = null; + private FailoverContextDelta failoverContextDelta = null; + // AutoMQ for Kafka inject end public MetadataDelta(MetadataImage image) { @@ -216,6 +219,13 @@ public KVDelta getOrCreateKVDelta() { return kvDelta; } + public FailoverContextDelta getOrCreateFailoverContextDelta() { + if (failoverContextDelta == null) { + failoverContextDelta = new FailoverContextDelta(image.failoverContext()); + } + return failoverContextDelta; + } + // AutoMQ for Kafka inject end public Optional metadataVersionChanged() { @@ -481,6 +491,10 @@ public void replay(RemoveKVRecord record) { getOrCreateKVDelta().replay(record); } + public void replay(FailoverContextRecord record) { + getOrCreateFailoverContextDelta().replay(record); + } + // AutoMQ for Kafka inject end /** @@ -545,6 +559,7 @@ public MetadataImage apply(MetadataProvenance provenance) { S3StreamsMetadataImage newStreamMetadata = getNewS3StreamsMetadataImage(); S3ObjectsImage newS3ObjectsMetadata = getNewS3ObjectsMetadataImage(); KVImage newKVImage = getNewKVImage(); + FailoverContextImage failoverContextImage = getNewFailoverContextImage(); // AutoMQ for Kafka inject end return new MetadataImage( provenance, @@ -557,7 +572,8 @@ public MetadataImage apply(MetadataProvenance provenance) { newAcls, newStreamMetadata, newS3ObjectsMetadata, - newKVImage + newKVImage, + failoverContextImage ); } @@ -578,6 +594,11 @@ private KVImage getNewKVImage() { image.kv() : kvDelta.apply(); } + private FailoverContextImage getNewFailoverContextImage() { + return failoverContextDelta == null ? + image.failoverContext() : failoverContextDelta.apply(); + } + // AutoMQ for Kafka inject end @Override diff --git a/metadata/src/main/java/org/apache/kafka/image/MetadataImage.java b/metadata/src/main/java/org/apache/kafka/image/MetadataImage.java index db9c4aeebe..71c6499209 100644 --- a/metadata/src/main/java/org/apache/kafka/image/MetadataImage.java +++ b/metadata/src/main/java/org/apache/kafka/image/MetadataImage.java @@ -41,7 +41,8 @@ public final class MetadataImage { AclsImage.EMPTY, S3StreamsMetadataImage.EMPTY, S3ObjectsImage.EMPTY, - KVImage.EMPTY); + KVImage.EMPTY, + FailoverContextImage.EMPTY); private final MetadataProvenance provenance; @@ -67,6 +68,8 @@ public final class MetadataImage { private final KVImage kv; + private final FailoverContextImage failover; + // AutoMQ for Kafka inject end public MetadataImage( @@ -80,7 +83,8 @@ public MetadataImage( AclsImage acls, S3StreamsMetadataImage streamMetadata, S3ObjectsImage s3ObjectsImage, - KVImage kvImage + KVImage kvImage, + FailoverContextImage failover ) { this.provenance = provenance; this.features = features; @@ -93,6 +97,7 @@ public MetadataImage( this.streamMetadata = streamMetadata; this.objectsMetadata = s3ObjectsImage; this.kv = kvImage; + this.failover = failover; } public boolean isEmpty() { @@ -105,7 +110,8 @@ public boolean isEmpty() { acls.isEmpty() && streamMetadata.isEmpty() && objectsMetadata.isEmpty() && - kv.isEmpty(); + kv.isEmpty() && + failover.isEmpty(); } public MetadataProvenance provenance() { @@ -162,6 +168,10 @@ public KVImage kv() { return kv; } + public FailoverContextImage failoverContext() { + return failover; + } + // AutoMQ for Kafka inject end public void write(ImageWriter writer, ImageWriterOptions options) { @@ -178,6 +188,7 @@ public void write(ImageWriter writer, ImageWriterOptions options) { streamMetadata.write(writer, options); objectsMetadata.write(writer, options); kv.write(writer, options); + failover.write(writer, options); // AutoMQ for Kafka inject end writer.close(true); } @@ -196,7 +207,8 @@ public boolean equals(Object o) { acls.equals(other.acls) && streamMetadata.equals(other.streamMetadata) && objectsMetadata.equals(other.objectsMetadata) && - kv.equals(other.kv); + kv.equals(other.kv) && + failover.equals(other.failover); } @Override @@ -212,7 +224,8 @@ public int hashCode() { acls, streamMetadata, objectsMetadata, - kv); + kv, + failover); } @Override @@ -229,6 +242,7 @@ public String toString() { ", streamMetadata=" + streamMetadata + ", objectsMetadata=" + objectsMetadata + ", kv=" + kv + + ", failover=" + failover + ")"; } } diff --git a/metadata/src/main/java/org/apache/kafka/metadata/stream/FailoverStatus.java b/metadata/src/main/java/org/apache/kafka/metadata/stream/FailoverStatus.java new file mode 100644 index 0000000000..9330d27050 --- /dev/null +++ b/metadata/src/main/java/org/apache/kafka/metadata/stream/FailoverStatus.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.metadata.stream; + +public enum FailoverStatus { + WAITING, RECOVERING, DONE +} diff --git a/metadata/src/main/java/org/apache/kafka/metadata/stream/S3Config.java b/metadata/src/main/java/org/apache/kafka/metadata/stream/S3Config.java deleted file mode 100644 index f0719dc622..0000000000 --- a/metadata/src/main/java/org/apache/kafka/metadata/stream/S3Config.java +++ /dev/null @@ -1,87 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.metadata.stream; - -/** - * S3Config contains the configuration of S3, such as the bucket name, the region, etc. - */ -public class S3Config { - public static final String ACCESS_KEY_NAME = "KAFKA_S3_ACCESS_KEY"; - public static final String SECRET_KEY_NAME = "KAFKA_S3_SECRET_KEY"; - - // Only for test, if true, use mocked S3 related classes - private final boolean mock; - - private final long objectRetentionTimeInSecond; - - private final String endpoint; - - private final String region; - - private final String bucket; - - private final String accessKey = System.getenv(ACCESS_KEY_NAME); - - private final String secretKey = System.getenv(SECRET_KEY_NAME); - - // Only for test - public S3Config(final boolean mock) { - this(null, null, null, -1, mock); - } - - public S3Config(final String endpoint, final String region, final String bucket, final long objectRetentionTimeInSecond) { - this(endpoint, region, bucket, objectRetentionTimeInSecond, false); - } - - public S3Config(final String endpoint, final String region, final String bucket, final long objectRetentionTimeInSecond, - final boolean mock) { - this.endpoint = endpoint; - this.region = region; - this.bucket = bucket; - this.objectRetentionTimeInSecond = objectRetentionTimeInSecond; - this.mock = mock; - } - - public String region() { - return region; - } - - public String bucket() { - return bucket; - } - - public String endpoint() { - return endpoint; - } - - public boolean mock() { - return mock; - } - - public long objectRetentionTimeInSecond() { - return objectRetentionTimeInSecond; - } - - public String getAccessKey() { - return accessKey; - } - - public String getSecretKey() { - return secretKey; - } -} diff --git a/metadata/src/main/resources/common/metadata/FailoverContextRecord.json b/metadata/src/main/resources/common/metadata/FailoverContextRecord.json new file mode 100644 index 0000000000..817d823269 --- /dev/null +++ b/metadata/src/main/resources/common/metadata/FailoverContextRecord.json @@ -0,0 +1,54 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 601, + "type": "metadata", + "name": "FailoverContextRecord", + "validVersions": "0", + "flexibleVersions": "0+", + "fields": [ + { + "name": "FailedNodeId", + "type": "int32", + "versions": "0+", + "about": "The failed node id." + }, + { + "name": "VolumeId", + "type": "string", + "versions": "0+", + "about": "The failed node data volume id." + }, + { + "name": "Device", + "type": "string", + "versions": "0+", + "about": "The device name of volume attach to the new node." + }, + { + "name": "TargetNodeId", + "type": "int32", + "versions": "0+", + "about": "The target node which is responsible for failover." + }, + { + "name": "Status", + "type": "string", + "versions": "0+", + "about": "The failover status: WAITING | RECOVERING | DONE." + } + ] +} \ No newline at end of file diff --git a/metadata/src/main/resources/common/metadata/NodeWALMetadataRecord.json b/metadata/src/main/resources/common/metadata/NodeWALMetadataRecord.json index ef0efd0619..6a9ae0902e 100644 --- a/metadata/src/main/resources/common/metadata/NodeWALMetadataRecord.json +++ b/metadata/src/main/resources/common/metadata/NodeWALMetadataRecord.json @@ -31,6 +31,12 @@ "type": "int64", "versions": "0+", "about": "The node epoch." + }, + { + "name": "FailoverMode", + "type": "bool", + "versions": "0+", + "about": "The failover mode enabled or not." } ] } \ No newline at end of file diff --git a/metadata/src/test/java/org/apache/kafka/controller/S3ObjectControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/S3ObjectControlManagerTest.java index 8859105d88..de8a86af41 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/S3ObjectControlManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/S3ObjectControlManagerTest.java @@ -26,6 +26,7 @@ import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors; +import com.automq.stream.s3.Config; import com.automq.stream.s3.operator.S3Operator; import org.apache.kafka.common.message.PrepareS3ObjectRequestData; import org.apache.kafka.common.message.PrepareS3ObjectResponseData; @@ -37,7 +38,6 @@ import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.controller.stream.S3ObjectControlManager; -import org.apache.kafka.metadata.stream.S3Config; import org.apache.kafka.metadata.stream.S3Object; import org.apache.kafka.metadata.stream.S3ObjectState; import org.apache.kafka.server.common.ApiMessageAndVersion; @@ -59,7 +59,7 @@ public class S3ObjectControlManagerTest { private static final String S3_REGION = "us-east-1"; private static final String S3_BUCKET = "kafka-on-S3-bucket"; - private static final S3Config S3_CONFIG = new S3Config(S3_ENDPOINT, S3_REGION, S3_BUCKET, 5); + private static final Config S3_CONFIG = new Config().endpoint(S3_ENDPOINT).region(S3_REGION).bucket(S3_BUCKET).objectRetentionTimeInSecond(5); private S3ObjectControlManager manager; private QuorumController controller; private S3Operator operator; diff --git a/metadata/src/test/java/org/apache/kafka/controller/StreamControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/StreamControlManagerTest.java index 9ab2628e4f..4df6bea222 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/StreamControlManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/StreamControlManagerTest.java @@ -55,8 +55,8 @@ import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.controller.stream.S3ObjectControlManager; import org.apache.kafka.controller.stream.StreamControlManager; -import org.apache.kafka.controller.stream.StreamControlManager.NodeS3StreamSetObjectMetadata; -import org.apache.kafka.controller.stream.StreamControlManager.S3StreamMetadata; +import org.apache.kafka.controller.stream.NodeMetadata; +import org.apache.kafka.controller.stream.S3StreamMetadata; import org.apache.kafka.metadata.stream.RangeMetadata; import org.apache.kafka.metadata.stream.S3StreamSetObject; import org.apache.kafka.server.common.ApiMessageAndVersion; @@ -826,13 +826,13 @@ public void testTrim() { assertEquals(60, rangeMetadata.startOffset()); assertEquals(70, rangeMetadata.endOffset()); assertEquals(0, streamMetadata.streamObjects().size()); - NodeS3StreamSetObjectMetadata node0Metadata = manager.nodesMetadata().get(BROKER0); + NodeMetadata node0Metadata = manager.nodesMetadata().get(BROKER0); assertEquals(1, node0Metadata.streamSetObjects().size()); S3StreamSetObject s3StreamSetObject = node0Metadata.streamSetObjects().get(1L); assertEquals(1, s3StreamSetObject.offsetRanges().size()); StreamOffsetRange range = s3StreamSetObject.offsetRanges().get(STREAM0); assertNull(range); - NodeS3StreamSetObjectMetadata node1Metadata = manager.nodesMetadata().get(BROKER1); + NodeMetadata node1Metadata = manager.nodesMetadata().get(BROKER1); assertEquals(1, node1Metadata.streamSetObjects().size()); s3StreamSetObject = node1Metadata.streamSetObjects().get(3L); assertEquals(1, s3StreamSetObject.offsetRanges().size()); diff --git a/metadata/src/test/java/org/apache/kafka/image/FailoverContextImageTest.java b/metadata/src/test/java/org/apache/kafka/image/FailoverContextImageTest.java new file mode 100644 index 0000000000..6853f8d010 --- /dev/null +++ b/metadata/src/test/java/org/apache/kafka/image/FailoverContextImageTest.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.image; + +import org.apache.kafka.common.metadata.FailoverContextRecord; +import org.apache.kafka.image.writer.ImageWriterOptions; +import org.apache.kafka.image.writer.RecordListWriter; +import org.apache.kafka.metadata.RecordTestUtils; +import org.apache.kafka.metadata.stream.FailoverStatus; +import org.apache.kafka.server.common.ApiMessageAndVersion; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class FailoverContextImageTest { + final static FailoverContextImage IMAGE1; + final static List DELTA1_RECORDS = new ArrayList<>(); + final static FailoverContextDelta DELTA1; + final static FailoverContextImage IMAGE2; + + static { + { + FailoverContextRecord r1 = new FailoverContextRecord(); + r1.setFailedNodeId(1); + r1.setStatus(FailoverStatus.WAITING.name()); + FailoverContextRecord r2 = new FailoverContextRecord(); + r2.setFailedNodeId(2); + r2.setStatus(FailoverStatus.RECOVERING.name()); + r2.setTargetNodeId(233); + Map map = Map.of(1, r1, 2, r2); + IMAGE1 = new FailoverContextImage(map); + } + + { + FailoverContextRecord r1 = new FailoverContextRecord(); + r1.setFailedNodeId(1); + r1.setStatus(FailoverStatus.RECOVERING.name()); + r1.setTargetNodeId(234); + FailoverContextRecord r2 = new FailoverContextRecord(); + r2.setFailedNodeId(2); + r2.setStatus(FailoverStatus.DONE.name()); + r2.setTargetNodeId(233); + Map map = Map.of(1, r1, 2, r2); + DELTA1_RECORDS.add(new ApiMessageAndVersion(r1, (short) 0)); + DELTA1_RECORDS.add(new ApiMessageAndVersion(r2, (short) 0)); + DELTA1 = new FailoverContextDelta(IMAGE1); + RecordTestUtils.replayAll(DELTA1, DELTA1_RECORDS); + } + + { + FailoverContextRecord r1 = new FailoverContextRecord(); + r1.setFailedNodeId(1); + r1.setStatus(FailoverStatus.RECOVERING.name()); + r1.setTargetNodeId(234); + Map map = Map.of(1, r1); + IMAGE2 = new FailoverContextImage(map); + } + } + + @Test + public void testImage1RoundTrip() { + testToImageAndBack(IMAGE1); + } + + @Test + public void testApplyDelta1() { + assertEquals(IMAGE2, DELTA1.apply()); + } + + private void testToImageAndBack(FailoverContextImage image) { + RecordListWriter writer = new RecordListWriter(); + ImageWriterOptions options = new ImageWriterOptions.Builder().build(); + image.write(writer, options); + FailoverContextDelta delta = new FailoverContextDelta(FailoverContextImage.EMPTY); + RecordTestUtils.replayAll(delta, writer.records()); + FailoverContextImage newImage = delta.apply(); + assertEquals(image, newImage); + } +} \ No newline at end of file diff --git a/metadata/src/test/java/org/apache/kafka/image/MetadataImageTest.java b/metadata/src/test/java/org/apache/kafka/image/MetadataImageTest.java index 1820f355bb..ed8cc46f95 100644 --- a/metadata/src/test/java/org/apache/kafka/image/MetadataImageTest.java +++ b/metadata/src/test/java/org/apache/kafka/image/MetadataImageTest.java @@ -46,7 +46,8 @@ public class MetadataImageTest { AclsImageTest.IMAGE1, S3StreamsMetadataImageTest.IMAGE1, S3ObjectsImageTest.IMAGE1, - KVImageTest.IMAGE1); + KVImageTest.IMAGE1, + FailoverContextImageTest.IMAGE1); DELTA1 = new MetadataDelta.Builder(). setImage(IMAGE1). @@ -61,6 +62,7 @@ public class MetadataImageTest { RecordTestUtils.replayAll(DELTA1, S3StreamsMetadataImageTest.DELTA1_RECORDS); RecordTestUtils.replayAll(DELTA1, S3ObjectsImageTest.DELTA1_RECORDS); RecordTestUtils.replayAll(DELTA1, KVImageTest.DELTA1_RECORDS); + RecordTestUtils.replayAll(DELTA1, FailoverContextImageTest.DELTA1_RECORDS); IMAGE2 = new MetadataImage( new MetadataProvenance(200, 5, 4000), @@ -73,7 +75,8 @@ public class MetadataImageTest { AclsImageTest.IMAGE2, S3StreamsMetadataImageTest.IMAGE2, S3ObjectsImageTest.IMAGE2, - KVImageTest.IMAGE2); + KVImageTest.IMAGE2, + FailoverContextImageTest.IMAGE2); } @Test diff --git a/metadata/src/test/java/org/apache/kafka/image/NodeS3StreamSetObjectMetadataImageTest.java b/metadata/src/test/java/org/apache/kafka/image/NodeMetadataImageTest.java similarity index 99% rename from metadata/src/test/java/org/apache/kafka/image/NodeS3StreamSetObjectMetadataImageTest.java rename to metadata/src/test/java/org/apache/kafka/image/NodeMetadataImageTest.java index 6c34426032..0b274ad7bb 100644 --- a/metadata/src/test/java/org/apache/kafka/image/NodeS3StreamSetObjectMetadataImageTest.java +++ b/metadata/src/test/java/org/apache/kafka/image/NodeMetadataImageTest.java @@ -40,7 +40,7 @@ @Timeout(value = 40) @Tag("S3Unit") -public class NodeS3StreamSetObjectMetadataImageTest { +public class NodeMetadataImageTest { private static final int BROKER0 = 0;