From d0333bb499daafbeb3580be5992949a93414e6c2 Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Fri, 29 Jul 2016 09:14:17 -0400 Subject: [PATCH] NIFI-2431: Before registering for the Cluster Coordinator role, check if another node already has this role. If so, do not register for this role until after the node has joined the cluster and inherited the flow. --- .../nifi/controller/FlowController.java | 44 ++++++++++++++----- .../nifi/controller/StandardFlowService.java | 4 +- .../CuratorLeaderElectionManager.java | 3 +- 3 files changed, 36 insertions(+), 15 deletions(-) diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java index 264989527e2b..9ca8e30df134 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -571,7 +571,23 @@ public void run() { if (configuredForClustering) { leaderElectionManager = new CuratorLeaderElectionManager(4); heartbeater = new ClusterProtocolHeartbeater(protocolSender, properties); - registerForClusterCoordinator(); + + // Check if there is already a cluster coordinator elected. If not, go ahead + // and register for coordinator role. If there is already one elected, do not register until + // we have connected to the cluster. This allows us to avoid becoming the coordinator with a + // flow that is different from the rest of the cluster (especially an empty flow) and then + // kicking everyone out. This way, we instead inherit the cluster flow before we attempt to be + // the coordinator. + LOG.info("Checking if there is already a Cluster Coordinator Elected..."); + final NodeIdentifier electedCoordinatorNodeId = clusterCoordinator.getElectedActiveCoordinatorNode(); + if (electedCoordinatorNodeId == null) { + LOG.info("It appears that no Cluster Coordinator has been Elected yet. Registering for Cluster Coordinator Role."); + registerForClusterCoordinator(); + } else { + LOG.info("The Elected Cluster Coordinator is {}. Will not register to be elected for this role until after connecting " + + "to the cluster and inheriting the cluster's flow.", electedCoordinatorNodeId); + } + leaderElectionManager.start(); } else { leaderElectionManager = null; @@ -3279,6 +3295,20 @@ public synchronized void onLeaderElection() { }); } + private void registerForPrimaryNode() { + leaderElectionManager.register(ClusterRoles.PRIMARY_NODE, new LeaderElectionStateChangeListener() { + @Override + public void onLeaderElection() { + setPrimary(true); + } + + @Override + public void onLeaderRelinquish() { + setPrimary(false); + } + }); + } + /** * Sets whether this instance is clustered. Clustered means that a node is either connected or trying to connect to the cluster. * @@ -3315,17 +3345,7 @@ public void setClustered(final boolean clustered, final String clusterInstanceId // update the bulletin repository if (isChanging) { if (clustered) { - leaderElectionManager.register(ClusterRoles.PRIMARY_NODE, new LeaderElectionStateChangeListener() { - @Override - public void onLeaderElection() { - setPrimary(true); - } - - @Override - public void onLeaderRelinquish() { - setPrimary(false); - } - }); + registerForPrimaryNode(); // Participate in Leader Election for Heartbeat Monitor. Start the heartbeat monitor // if/when we become leader and stop it when we lose leader role diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java index 71d66b541634..1b26e402afcd 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java @@ -772,12 +772,12 @@ private ConnectionResponse connect(final boolean retryOnCommsFailure, final bool try { response = senderListener.requestConnection(requestMsg).getConnectionResponse(); if (response.getRejectionReason() != null) { - logger.warn("Connection request was blocked by cluster manager with the explanation: " + response.getRejectionReason()); + logger.warn("Connection request was blocked by cluster coordinator with the explanation: " + response.getRejectionReason()); // set response to null and treat a firewall blockage the same as getting no response from manager response = null; break; } else if (response.shouldTryLater()) { - logger.info("Flow controller requested by cluster manager to retry connection in " + response.getTryLaterSeconds() + " seconds."); + logger.info("Flow controller requested by cluster coordinator to retry connection in " + response.getTryLaterSeconds() + " seconds."); try { Thread.sleep(response.getTryLaterSeconds() * 1000); } catch (final InterruptedException ie) { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/CuratorLeaderElectionManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/CuratorLeaderElectionManager.java index bea643fd887b..19fa2259a97e 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/CuratorLeaderElectionManager.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/CuratorLeaderElectionManager.java @@ -90,7 +90,7 @@ public synchronized void register(final String roleName, final LeaderElectionSta logger.debug("{} Registering new Leader Selector for role {}", this, roleName); if (leaderRoles.containsKey(roleName)) { - logger.warn("{} Attempted to register Leader Election for role '{}' but this role is already registered", this, roleName); + logger.info("{} Attempted to register Leader Election for role '{}' but this role is already registered", this, roleName); return; } @@ -130,6 +130,7 @@ public synchronized void unregister(final String roleName) { } leaderSelector.close(); + logger.info("This node is no longer registered to be elected as the Leader for Role '{}'", roleName); } @Override