Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,37 +19,5 @@

package kafka.automq.failover;

import java.util.Objects;

public final class K8sFailedNode implements FailedNode {
private final int id;

public K8sFailedNode(int id) {
this.id = id;
}

public int id() {
return id;
}

@Override
public boolean equals(Object obj) {
if (obj == this)
return true;
if (obj == null || obj.getClass() != this.getClass())
return false;
var that = (K8sFailedNode) obj;
return this.id == that.id;
}

@Override
public int hashCode() {
return Objects.hash(id);
}

@Override
public String toString() {
return "K8sFailedNode[" +
"id=" + id + ']';
}
public record DefaultFailedNode(int id, long epoch) implements FailedNode {
}
4 changes: 2 additions & 2 deletions core/src/main/java/kafka/automq/failover/FailedNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@ public interface FailedNode {
int id();

static FailedNode from(NodeRuntimeMetadata node) {
return new K8sFailedNode(node.id());
return new DefaultFailedNode(node.id(), node.epoch());
}

static FailedNode from(FailoverContext context) {
return new K8sFailedNode(context.getNodeId());
return new DefaultFailedNode(context.getNodeId(), context.getNodeEpoch());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ private List<NodeRuntimeMetadata> allNodes() {
node.getNodeId(),
// There are node epochs in both streamControlManager and nodeControlManager, and they are the same in most cases.
// However, in some rare cases, the node epoch in streamControlManager may be updated earlier than the node epoch in nodeControlManager.
// So we use the node epoch in nodeControlManager as the source of truth.
// So we use the node epoch in streamControlManager as the source of truth.
nodeEpochMap.get(node.getNodeId()),
node.getWalConfig(),
node.getTags(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,8 +183,8 @@ public CompletableFuture<Void> trim(RecordOffset offset) {
private CompletableFuture<? extends WriteAheadLog> buildRecoverWal(String kraftWalConfigs, long oldNodeEpoch) {
IdURI uri = IdURI.parse(kraftWalConfigs);
CompletableFuture<Void> cf = walHandle
.acquirePermission(nodeId, oldNodeEpoch, uri, new WalHandle.AcquirePermissionOptions().failoverMode(failoverMode));
return cf.thenApplyAsync(nil -> factory.build(uri, BuildOptions.builder().nodeEpoch(oldNodeEpoch).openMode(failoverMode ? OpenMode.FAILOVER : OpenMode.READ_WRITE).build()), executor);
.acquirePermission(nodeId, oldNodeEpoch, uri, new WalHandle.AcquirePermissionOptions().failoverMode(true));
return cf.thenApplyAsync(nil -> factory.build(uri, BuildOptions.builder().nodeEpoch(oldNodeEpoch).openMode(OpenMode.FAILOVER).build()), executor);
}

private CompletableFuture<? extends WriteAheadLog> buildWal(String kraftWalConfigs) {
Expand Down
Loading