From ab7d56aaea7e313addd8578ab201d4935e93a617 Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Fri, 4 May 2018 09:31:51 -0400 Subject: [PATCH] NIFI-5153: If a node is disconnected due to failure to complete mutable request, the node should be allowed to rejoin --- .../coordination/node/NodeClusterCoordinator.java | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java index 754d3708ec8a..4e4625cc9607 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java @@ -373,7 +373,7 @@ public Map> getConnectionStates() { final Map> connectionStates = new HashMap<>(); for (final Map.Entry entry : nodeStatuses.entrySet()) { final NodeConnectionState state = entry.getValue().getState(); - final List nodeIds = connectionStates.computeIfAbsent(state, s -> new ArrayList()); + final List nodeIds = connectionStates.computeIfAbsent(state, s -> new ArrayList<>()); nodeIds.add(entry.getKey()); } @@ -998,9 +998,12 @@ public void afterRequest(final String uriPath, final String method, final Set failedNodeIds = problematicNodeResponses.stream().map(response -> response.getNodeId()).collect(Collectors.toSet()); - logger.warn(String.format("The following nodes failed to process URI %s '%s'. Requesting each node disconnect from cluster.", uriPath, failedNodeIds)); + logger.warn(String.format("The following nodes failed to process URI %s '%s'. Requesting each node reconnect to cluster.", uriPath, failedNodeIds)); for (final NodeIdentifier nodeId : failedNodeIds) { - requestNodeDisconnect(nodeId, DisconnectionCode.FAILED_TO_SERVICE_REQUEST, "Failed to process request " + method + " " + uriPath); + // Update the node to 'CONNECTING' status and request that the node connect + final NodeConnectionStatus reconnectionStatus = new NodeConnectionStatus(nodeId, NodeConnectionState.CONNECTING); + updateNodeStatus(reconnectionStatus); + requestNodeConnect(nodeId, null); } } }