diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle index 8f17d45d43..bf3ee49bce 100644 --- a/gradle/dependencies.gradle +++ b/gradle/dependencies.gradle @@ -128,7 +128,7 @@ versions += [ zookeeper: "3.6.3", zstd: "1.5.2-1", commonLang: "3.12.0", - s3stream: "0.6.3-SNAPSHOT", + s3stream: "0.6.4-SNAPSHOT", ] libs += [ activation: "javax.activation:activation:$versions.activation", diff --git a/metadata/src/main/java/org/apache/kafka/controller/stream/FailoverControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/stream/FailoverControlManager.java index cffbb1b097..469b2b7228 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/stream/FailoverControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/stream/FailoverControlManager.java @@ -159,6 +159,7 @@ private void complete(List failedNodes, List r if (!failedNodeIdSet.contains(nodeId)) { FailoverContextRecord completedRecord = context.duplicate(); completedRecord.setStatus(FailoverStatus.DONE.name()); + LOGGER.info("failed node failover complete, {}", completedRecord); // the target node already complete the recover and delete the volume, so remove the failover context records.add(new ApiMessageAndVersion(completedRecord, (short) 0)); } @@ -207,7 +208,7 @@ void backgroundAttach0() { attachedRecord.setDevice(device); attachedRecord.setStatus(FailoverStatus.RECOVERING.name()); attached.put(failedNodeId, attachedRecord); - LOGGER.info("attach failed node {} to target node {} success", failedNodeId, broker.id()); + LOGGER.info("attach failed node {} to target node {} success, record {}", failedNodeId, broker.id(), attachedRecord); }).exceptionally(ex -> { LOGGER.error("attach failed node {} to target node {} failed", context.failedNodeId(), broker.id(), ex); return null; @@ -220,7 +221,11 @@ void backgroundAttach0() { } public void replay(FailoverContextRecord record) { - failoverContexts.put(record.failedNodeId(), record); + if (FailoverStatus.DONE.name().equals(record.status())) { + failoverContexts.remove(record.failedNodeId()); + } else { + failoverContexts.put(record.failedNodeId(), record); + } } /**