Skip to content

Commit

Permalink
NIFI-7389 Makes Missable heartbeat counts configurable
Browse files Browse the repository at this point in the history
This closes apache#4236.

Signed-off-by: Andy LoPresto <alopresto@apache.org>
  • Loading branch information
sushilkm authored and driesva committed Mar 19, 2021
1 parent c26141c commit c0ee075
Show file tree
Hide file tree
Showing 7 changed files with 15 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ public abstract class NiFiProperties {

// cluster common properties
public static final String CLUSTER_PROTOCOL_HEARTBEAT_INTERVAL = "nifi.cluster.protocol.heartbeat.interval";
public static final String CLUSTER_PROTOCOL_HEARTBEAT_MISSABLE_MAX = "nifi.cluster.protocol.heartbeat.missable.max";
public static final String CLUSTER_PROTOCOL_IS_SECURE = "nifi.cluster.protocol.is.secure";

// cluster node properties
Expand Down Expand Up @@ -305,6 +306,7 @@ public abstract class NiFiProperties {

// cluster common defaults
public static final String DEFAULT_CLUSTER_PROTOCOL_HEARTBEAT_INTERVAL = "5 sec";
public static final int DEFAULT_CLUSTER_PROTOCOL_HEARTBEAT_MISSABLE_MAX = 8;
public static final String DEFAULT_CLUSTER_PROTOCOL_MULTICAST_SERVICE_BROADCAST_DELAY = "500 ms";
public static final int DEFAULT_CLUSTER_PROTOCOL_MULTICAST_SERVICE_LOCATOR_ATTEMPTS = 3;
public static final String DEFAULT_CLUSTER_PROTOCOL_MULTICAST_SERVICE_LOCATOR_ATTEMPTS_DELAY = "1 sec";
Expand Down
5 changes: 3 additions & 2 deletions nifi-docs/src/main/asciidoc/administration-guide.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -1629,8 +1629,8 @@ It just depends on the resources available and how the Administrator decides to

*Heartbeats*: The nodes communicate their health and status to the currently elected Cluster Coordinator via "heartbeats",
which let the Coordinator know they are still connected to the cluster and working properly. By default, the nodes emit
heartbeats every 5 seconds, and if the Cluster Coordinator does not receive a heartbeat from a node within 40 seconds, it
disconnects the node due to "lack of heartbeat". The 5-second setting is configurable in the _nifi.properties_ file (see
heartbeats every 5 seconds, and if the Cluster Coordinator does not receive a heartbeat from a node within 40 seconds (= 5 seconds * 8), it
disconnects the node due to "lack of heartbeat". The 5-second and 8 times settings are configurable in the _nifi.properties_ file (see
the <<cluster_common_properties>> section for more information). The reason that the Cluster Coordinator
disconnects the node is because the Coordinator needs to ensure that every node in the cluster is in sync, and if a node
is not heard from regularly, the Coordinator cannot be sure it is still in sync with the rest of the cluster. If, after
Expand Down Expand Up @@ -3334,6 +3334,7 @@ When setting up a NiFi cluster, these properties should be configured the same w
|====
|*Property*|*Description*
|`nifi.cluster.protocol.heartbeat.interval`|The interval at which nodes should emit heartbeats to the Cluster Coordinator. The default value is `5 sec`.
|`nifi.cluster.protocol.heartbeat.missable.max`|Maximum number of heartbeats a Cluster Coordinator can miss for a node in the cluster before the Cluster Coordinator updates the node status to Disconnected. The default value is `8`.
|`nifi.cluster.protocol.is.secure`|This indicates whether cluster communications are secure. The default value is `false`.
|====

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
public abstract class AbstractHeartbeatMonitor implements HeartbeatMonitor {

private final int heartbeatIntervalMillis;
private final int missableHeartbeatCount;
private static final Logger logger = LoggerFactory.getLogger(AbstractHeartbeatMonitor.class);
protected final ClusterCoordinator clusterCoordinator;
protected final FlowEngine flowEngine = new FlowEngine(1, "Heartbeat Monitor", true);
Expand All @@ -51,6 +52,9 @@ public AbstractHeartbeatMonitor(final ClusterCoordinator clusterCoordinator, fin
NiFiProperties.DEFAULT_CLUSTER_PROTOCOL_HEARTBEAT_INTERVAL);
this.heartbeatIntervalMillis = (int) FormatUtils.getTimeDuration(heartbeatInterval, TimeUnit.MILLISECONDS);

this.missableHeartbeatCount = nifiProperties.getIntegerProperty(NiFiProperties.CLUSTER_PROTOCOL_HEARTBEAT_MISSABLE_MAX,
NiFiProperties.DEFAULT_CLUSTER_PROTOCOL_HEARTBEAT_MISSABLE_MAX);

// Register an event listener so that if any nodes are removed, we also remove the heartbeat.
// Otherwise, we'll have a condition where a node is removed from the Cluster Coordinator, but its heartbeat has already been received.
// As a result, when it is processed, we will ask the node to reconnect, adding it back to the cluster.
Expand Down Expand Up @@ -158,8 +162,8 @@ protected synchronized void monitorHeartbeats() {
procStopWatch.stop();
logger.info("Finished processing {} heartbeats in {}", latestHeartbeats.size(), procStopWatch.getDuration());

// Disconnect any node that hasn't sent a heartbeat in a long time (8 times the heartbeat interval)
final long maxMillis = heartbeatIntervalMillis * 8;
// Disconnect any node that hasn't sent a heartbeat in a long time (CLUSTER_PROTOCOL_HEARTBEAT_MISSABLE_MAX times the heartbeat interval)
final long maxMillis = heartbeatIntervalMillis * missableHeartbeatCount;
final long currentTimestamp = System.currentTimeMillis();
final long threshold = currentTimestamp - maxMillis;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ nifi.security.user.knox.audiences=

# cluster common properties (all nodes must have same values) #
nifi.cluster.protocol.heartbeat.interval=5 sec
nifi.cluster.protocol.heartbeat.missable.max=8
nifi.cluster.protocol.is.secure=false

# cluster node properties (only configure for cluster nodes) #
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ nifi.security.user.knox.audiences=

# cluster common properties (all nodes must have same values) #
nifi.cluster.protocol.heartbeat.interval=5 sec
nifi.cluster.protocol.heartbeat.missable.max=8
nifi.cluster.protocol.is.secure=false

# cluster node properties (only configure for cluster nodes) #
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@

<!-- nifi.properties: cluster common properties (cluster manager and nodes must have same values) -->
<nifi.cluster.protocol.heartbeat.interval>5 sec</nifi.cluster.protocol.heartbeat.interval>
<nifi.cluster.protocol.heartbeat.missable.max>8</nifi.cluster.protocol.heartbeat.missable.max>
<nifi.cluster.protocol.is.secure>false</nifi.cluster.protocol.is.secure>

<!-- nifi.properties: cluster node properties (only configure for cluster nodes) -->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ nifi.security.user.knox.audiences=${nifi.security.user.knox.audiences}

# cluster common properties (all nodes must have same values) #
nifi.cluster.protocol.heartbeat.interval=${nifi.cluster.protocol.heartbeat.interval}
nifi.cluster.protocol.heartbeat.missable.max=${nifi.cluster.protocol.heartbeat.missable.max}
nifi.cluster.protocol.is.secure=${nifi.cluster.protocol.is.secure}

# cluster node properties (only configure for cluster nodes) #
Expand Down

0 comments on commit c0ee075

Please sign in to comment.