Skip to content

Commit

Permalink
Extract AtomicRegisterPreVoteCollector and StoreHeartbeatService to p…
Browse files Browse the repository at this point in the history
…roduction (#95296)
  • Loading branch information
fcofdez committed Apr 17, 2023
1 parent 7b994ba commit da21447
Show file tree
Hide file tree
Showing 12 changed files with 746 additions and 153 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.cluster.coordination.stateless;

import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.coordination.PreVoteCollector;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.core.Releasable;

import java.util.concurrent.atomic.AtomicBoolean;

public class AtomicRegisterPreVoteCollector extends PreVoteCollector {
private final StoreHeartbeatService heartbeatService;
private final Runnable startElection;

public AtomicRegisterPreVoteCollector(StoreHeartbeatService heartbeatService, Runnable startElection) {
this.heartbeatService = heartbeatService;
this.startElection = startElection;
}

@Override
public Releasable start(ClusterState clusterState, Iterable<DiscoveryNode> broadcastNodes) {
final var shouldRun = new AtomicBoolean(true);
heartbeatService.runIfNoRecentLeader(() -> {
if (shouldRun.getAndSet(false)) {
startElection.run();
}
});

return () -> shouldRun.set(false);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.cluster.coordination.stateless;

import org.elasticsearch.cluster.node.DiscoveryNode;

public record Heartbeat(DiscoveryNode leader, long term, long absoluteTimeInMillis) {
long timeSinceLastHeartbeatInMillis(long nowInMillis) {
return nowInMillis - absoluteTimeInMillis;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.cluster.coordination.stateless;

import org.elasticsearch.action.ActionListener;

public interface HeartbeatStore {
void writeHeartbeat(Heartbeat newHeartbeat, ActionListener<Void> listener);

void readLatestHeartbeat(ActionListener<Heartbeat> listener);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.cluster.coordination.stateless;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.cluster.coordination.LeaderHeartbeatService;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.function.LongSupplier;

public class StoreHeartbeatService implements LeaderHeartbeatService {
public static final Setting<TimeValue> HEARTBEAT_FREQUENCY = Setting.timeSetting(
"heartbeat_frequency",
TimeValue.timeValueSeconds(15),
Setting.Property.NodeScope
);

public static final Setting<Integer> MAX_MISSED_HEARTBEATS = Setting.intSetting(
"max_missed_heartbeats",
2,
1,
Setting.Property.NodeScope
);

private static final Logger logger = LogManager.getLogger(StoreHeartbeatService.class);

private final HeartbeatStore heartbeatStore;
private final ThreadPool threadPool;
private final TimeValue heartbeatFrequency;
private final TimeValue maxTimeSinceLastHeartbeat;
private final LongSupplier currentTermSupplier;

private volatile HeartbeatTask heartbeatTask;

public StoreHeartbeatService(
HeartbeatStore heartbeatStore,
ThreadPool threadPool,
TimeValue heartbeatFrequency,
TimeValue maxTimeSinceLastHeartbeat,
LongSupplier currentTermSupplier
) {
this.heartbeatStore = heartbeatStore;
this.threadPool = threadPool;
this.heartbeatFrequency = heartbeatFrequency;
this.maxTimeSinceLastHeartbeat = maxTimeSinceLastHeartbeat;
this.currentTermSupplier = currentTermSupplier;
}

@Override
public void start(DiscoveryNode currentLeader, long term, ActionListener<Long> completionListener) {
final var newHeartbeatTask = new HeartbeatTask(currentLeader, term, completionListener);
heartbeatTask = newHeartbeatTask;
newHeartbeatTask.run();
}

@Override
public void stop() {
heartbeatTask = null;
}

protected long absoluteTimeInMillis() {
return threadPool.absoluteTimeInMillis();
}

void runIfNoRecentLeader(Runnable runnable) {
heartbeatStore.readLatestHeartbeat(new ActionListener<>() {
@Override
public void onResponse(Heartbeat heartBeat) {
if (heartBeat == null
|| maxTimeSinceLastHeartbeat.millis() <= heartBeat.timeSinceLastHeartbeatInMillis(absoluteTimeInMillis())) {
runnable.run();
} else {
logger.trace("runIfNoRecentLeader: found recent leader");
}
}

@Override
public void onFailure(Exception e) {
logger.trace("runIfNoRecentLeader: readLatestHeartbeat failed", e);
}
});
}

private class HeartbeatTask extends ActionRunnable<Long> {
private final DiscoveryNode currentLeader;
private final long heartbeatTerm;
private final ActionListener<Void> rerunListener;

HeartbeatTask(DiscoveryNode currentLeader, long heartbeatTerm, ActionListener<Long> listener) {
super(listener);
this.currentLeader = currentLeader;
this.heartbeatTerm = heartbeatTerm;
this.rerunListener = listener.delegateFailure((l, v) -> {
try {
threadPool.schedule(HeartbeatTask.this, heartbeatFrequency, ThreadPool.Names.GENERIC);
} catch (Exception e) {
l.onFailure(e);
}
});
}

@Override
protected void doRun() throws Exception {
if (heartbeatTask != HeartbeatTask.this) {
// already cancelled
return;
}

final var registerTerm = currentTermSupplier.getAsLong();
if (registerTerm == heartbeatTerm) {
heartbeatStore.writeHeartbeat(new Heartbeat(currentLeader, heartbeatTerm, absoluteTimeInMillis()), rerunListener);
} else {
assert heartbeatTerm < registerTerm;
listener.onResponse(registerTerm);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ public DiscoveryModule(

var reconfigurator = getReconfigurator(settings, clusterSettings, clusterCoordinationPlugins);
var preVoteCollectorFactory = getPreVoteCollectorFactory(clusterCoordinationPlugins);
var leaderHeartbeatService = getLeaderHeartbeatService(settings, clusterCoordinationPlugins);

if (MULTI_NODE_DISCOVERY_TYPE.equals(discoveryType)
|| LEGACY_MULTI_NODE_DISCOVERY_TYPE.equals(discoveryType)
Expand All @@ -209,7 +210,7 @@ public DiscoveryModule(
nodeHealthService,
circuitBreakerService,
reconfigurator,
LeaderHeartbeatService.NO_OP,
leaderHeartbeatService,
preVoteCollectorFactory
);
} else {
Expand Down Expand Up @@ -259,6 +260,23 @@ static PreVoteCollector.Factory getPreVoteCollectorFactory(List<ClusterCoordinat
return StatefulPreVoteCollector::new;
}

static LeaderHeartbeatService getLeaderHeartbeatService(Settings settings, List<ClusterCoordinationPlugin> clusterCoordinationPlugins) {
final var heartbeatServices = clusterCoordinationPlugins.stream()
.map(plugin -> plugin.getLeaderHeartbeatService(settings))
.flatMap(Optional::stream)
.toList();

if (heartbeatServices.size() > 1) {
throw new IllegalStateException("multiple leader heart beat service factories found: " + heartbeatServices);
}

if (heartbeatServices.size() == 1) {
return heartbeatServices.get(0);
}

return LeaderHeartbeatService.NO_OP;
}

public static boolean isSingleNodeDiscovery(Settings settings) {
return SINGLE_NODE_DISCOVERY_TYPE.equals(DISCOVERY_TYPE_SETTING.get(settings));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.coordination.CoordinationState;
import org.elasticsearch.cluster.coordination.ElectionStrategy;
import org.elasticsearch.cluster.coordination.LeaderHeartbeatService;
import org.elasticsearch.cluster.coordination.PreVoteCollector;
import org.elasticsearch.cluster.coordination.Reconfigurator;
import org.elasticsearch.cluster.node.DiscoveryNode;
Expand Down Expand Up @@ -62,6 +63,10 @@ default Optional<PreVoteCollector.Factory> getPreVoteCollectorFactory() {
return Optional.empty();
}

default Optional<LeaderHeartbeatService> getLeaderHeartbeatService(Settings settings) {
return Optional.empty();
}

interface PersistedStateFactory {
CoordinationState.PersistedState createPersistedState(
Settings settings,
Expand Down

0 comments on commit da21447

Please sign in to comment.