diff --git a/core/src/main/java/kafka/automq/failover/K8sFailedNode.java b/core/src/main/java/kafka/automq/failover/DefaultFailedNode.java similarity index 55% rename from core/src/main/java/kafka/automq/failover/K8sFailedNode.java rename to core/src/main/java/kafka/automq/failover/DefaultFailedNode.java index 9d02a2d0c3..9a1e06ec27 100644 --- a/core/src/main/java/kafka/automq/failover/K8sFailedNode.java +++ b/core/src/main/java/kafka/automq/failover/DefaultFailedNode.java @@ -19,37 +19,5 @@ package kafka.automq.failover; -import java.util.Objects; - -public final class K8sFailedNode implements FailedNode { - private final int id; - - public K8sFailedNode(int id) { - this.id = id; - } - - public int id() { - return id; - } - - @Override - public boolean equals(Object obj) { - if (obj == this) - return true; - if (obj == null || obj.getClass() != this.getClass()) - return false; - var that = (K8sFailedNode) obj; - return this.id == that.id; - } - - @Override - public int hashCode() { - return Objects.hash(id); - } - - @Override - public String toString() { - return "K8sFailedNode[" + - "id=" + id + ']'; - } +public record DefaultFailedNode(int id, long epoch) implements FailedNode { } diff --git a/core/src/main/java/kafka/automq/failover/FailedNode.java b/core/src/main/java/kafka/automq/failover/FailedNode.java index 8cba001851..a6d83aef95 100644 --- a/core/src/main/java/kafka/automq/failover/FailedNode.java +++ b/core/src/main/java/kafka/automq/failover/FailedNode.java @@ -27,10 +27,10 @@ public interface FailedNode { int id(); static FailedNode from(NodeRuntimeMetadata node) { - return new K8sFailedNode(node.id()); + return new DefaultFailedNode(node.id(), node.epoch()); } static FailedNode from(FailoverContext context) { - return new K8sFailedNode(context.getNodeId()); + return new DefaultFailedNode(context.getNodeId(), context.getNodeEpoch()); } } diff --git a/core/src/main/java/kafka/automq/failover/FailoverControlManager.java b/core/src/main/java/kafka/automq/failover/FailoverControlManager.java index 62375ed461..13b1ee4056 100644 --- a/core/src/main/java/kafka/automq/failover/FailoverControlManager.java +++ b/core/src/main/java/kafka/automq/failover/FailoverControlManager.java @@ -181,7 +181,7 @@ private List allNodes() { node.getNodeId(), // There are node epochs in both streamControlManager and nodeControlManager, and they are the same in most cases. // However, in some rare cases, the node epoch in streamControlManager may be updated earlier than the node epoch in nodeControlManager. - // So we use the node epoch in nodeControlManager as the source of truth. + // So we use the node epoch in streamControlManager as the source of truth. nodeEpochMap.get(node.getNodeId()), node.getWalConfig(), node.getTags(), diff --git a/core/src/main/scala/kafka/log/stream/s3/wal/BootstrapWalV1.java b/core/src/main/scala/kafka/log/stream/s3/wal/BootstrapWalV1.java index 646ac0ca64..6999d00aab 100644 --- a/core/src/main/scala/kafka/log/stream/s3/wal/BootstrapWalV1.java +++ b/core/src/main/scala/kafka/log/stream/s3/wal/BootstrapWalV1.java @@ -183,8 +183,8 @@ public CompletableFuture trim(RecordOffset offset) { private CompletableFuture buildRecoverWal(String kraftWalConfigs, long oldNodeEpoch) { IdURI uri = IdURI.parse(kraftWalConfigs); CompletableFuture cf = walHandle - .acquirePermission(nodeId, oldNodeEpoch, uri, new WalHandle.AcquirePermissionOptions().failoverMode(failoverMode)); - return cf.thenApplyAsync(nil -> factory.build(uri, BuildOptions.builder().nodeEpoch(oldNodeEpoch).openMode(failoverMode ? OpenMode.FAILOVER : OpenMode.READ_WRITE).build()), executor); + .acquirePermission(nodeId, oldNodeEpoch, uri, new WalHandle.AcquirePermissionOptions().failoverMode(true)); + return cf.thenApplyAsync(nil -> factory.build(uri, BuildOptions.builder().nodeEpoch(oldNodeEpoch).openMode(OpenMode.FAILOVER).build()), executor); } private CompletableFuture buildWal(String kraftWalConfigs) {