Skip to content

Commit

Permalink
https://issues.apache.org/jira/browse/ARTEMIS-1565
Browse files Browse the repository at this point in the history
  • Loading branch information
andytaylor committed Dec 19, 2017
1 parent d1c9bc0 commit 6067a28
Show file tree
Hide file tree
Showing 13 changed files with 216 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,12 @@ public static String getDefaultHapolicyBackupStrategy() {

public static boolean DEFAULT_VOTE_ON_REPLICATION_FAILURE = false;

//how many times we retry a vote before restarting as a backup
private static int DEFAULT_VOTE_RETRIES = 12;

//how long we wait between votes, 5 secs
private static long DEFAULT_VOTE_RETRY_WAIT = 5000;

public static int DEFAULT_QUORUM_SIZE = -1;

public static final boolean DEFAULT_ANALYZE_CRITICAL = true;
Expand Down Expand Up @@ -1334,4 +1340,11 @@ public static CriticalAnalyzerPolicy getCriticalAnalyzerPolicy() {
}


public static int getDefaultVoteRetries() {
return DEFAULT_VOTE_RETRIES;
}

public static long getDefaultVoteRetryWait() {
return DEFAULT_VOTE_RETRY_WAIT;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,11 @@ public static HAPolicy getHAPolicy(HAPolicyConfiguration conf,
}
case REPLICATED: {
ReplicatedPolicyConfiguration pc = (ReplicatedPolicyConfiguration) conf;
return new ReplicatedPolicy(pc.isCheckForLiveServer(), pc.getGroupName(), pc.getClusterName(), pc.getInitialReplicationSyncTimeout(), server.getNetworkHealthCheck(), pc.getVoteOnReplicationFailure(), pc.getQuorumSize());
return new ReplicatedPolicy(pc.isCheckForLiveServer(), pc.getGroupName(), pc.getClusterName(), pc.getInitialReplicationSyncTimeout(), server.getNetworkHealthCheck(), pc.getVoteOnReplicationFailure(), pc.getQuorumSize(), pc.getVoteRetries(), pc.getVoteRetryWait());
}
case REPLICA: {
ReplicaPolicyConfiguration pc = (ReplicaPolicyConfiguration) conf;
return new ReplicaPolicy(pc.getClusterName(), pc.getMaxSavedReplicatedJournalsSize(), pc.getGroupName(), pc.isRestartBackup(), pc.isAllowFailBack(), pc.getInitialReplicationSyncTimeout(), getScaleDownPolicy(pc.getScaleDownConfiguration()), server.getNetworkHealthCheck(), pc.getVoteOnReplicationFailure(), pc.getQuorumSize());
return new ReplicaPolicy(pc.getClusterName(), pc.getMaxSavedReplicatedJournalsSize(), pc.getGroupName(), pc.isRestartBackup(), pc.isAllowFailBack(), pc.getInitialReplicationSyncTimeout(), getScaleDownPolicy(pc.getScaleDownConfiguration()), server.getNetworkHealthCheck(), pc.getVoteOnReplicationFailure(), pc.getQuorumSize(), pc.getVoteRetries(), pc.getVoteRetryWait());
}
case SHARED_STORE_MASTER: {
SharedStoreMasterPolicyConfiguration pc = (SharedStoreMasterPolicyConfiguration) conf;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ public class ReplicaPolicyConfiguration implements HAPolicyConfiguration {

private int quorumSize = ActiveMQDefaultConfiguration.getDefaultQuorumSize();

private int voteRetries = ActiveMQDefaultConfiguration.getDefaultVoteRetries();

private long voteRetryWait = ActiveMQDefaultConfiguration.getDefaultVoteRetryWait();

public ReplicaPolicyConfiguration() {
}

Expand Down Expand Up @@ -139,4 +143,20 @@ public int getQuorumSize() {
public void setQuorumSize(int quorumSize) {
this.quorumSize = quorumSize;
}

public int getVoteRetries() {
return voteRetries;
}

public void setVoteRetries(int voteRetries) {
this.voteRetries = voteRetries;
}

public void setVoteRetryWait(long voteRetryWait) {
this.voteRetryWait = voteRetryWait;
}

public long getVoteRetryWait() {
return voteRetryWait;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ public class ReplicatedPolicyConfiguration implements HAPolicyConfiguration {

private int quorumSize = ActiveMQDefaultConfiguration.getDefaultQuorumSize();

private int voteRetries = ActiveMQDefaultConfiguration.getDefaultVoteRetries();

private long voteRetryWait = ActiveMQDefaultConfiguration.getDefaultVoteRetryWait();

public ReplicatedPolicyConfiguration() {
}

Expand Down Expand Up @@ -91,4 +95,21 @@ public int getQuorumSize() {
public void setQuorumSize(int quorumSize) {
this.quorumSize = quorumSize;
}


public int getVoteRetries() {
return voteRetries;
}

public void setVoteRetries(int voteRetries) {
this.voteRetries = voteRetries;
}

public void setVoteRetryWait(long voteRetryWait) {
this.voteRetryWait = voteRetryWait;
}

public long getVoteRetryWait() {
return voteRetryWait;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1284,6 +1284,10 @@ private ReplicatedPolicyConfiguration createReplicatedHaPolicy(Element policyNod

configuration.setVoteOnReplicationFailure(getBoolean(policyNode, "vote-on-replication-failure", configuration.getVoteOnReplicationFailure()));

configuration.setVoteRetries(getInteger(policyNode, "vote-retries", configuration.getVoteRetries(), Validators.MINUS_ONE_OR_GE_ZERO));

configuration.setVoteRetryWait(getLong(policyNode, "vote-retry-wait", configuration.getVoteRetryWait(), Validators.GT_ZERO));

configuration.setQuorumSize(getInteger(policyNode, "quorum-size", configuration.getQuorumSize(), Validators.MINUS_ONE_OR_GT_ZERO));

return configuration;
Expand All @@ -1308,6 +1312,10 @@ private ReplicaPolicyConfiguration createReplicaHaPolicy(Element policyNode) {

configuration.setVoteOnReplicationFailure(getBoolean(policyNode, "vote-on-replication-failure", configuration.getVoteOnReplicationFailure()));

configuration.setVoteRetries(getInteger(policyNode, "vote-retries", configuration.getVoteRetries(), Validators.MINUS_ONE_OR_GE_ZERO));

configuration.setVoteRetryWait(getLong(policyNode, "vote-retry-wait", configuration.getVoteRetryWait(), Validators.GT_ZERO));

configuration.setQuorumSize(getInteger(policyNode, "quorum-size", configuration.getQuorumSize(), Validators.MINUS_ONE_OR_GT_ZERO));

return configuration;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ public class ReplicaPolicy extends BackupPolicy {

private final NetworkHealthCheck networkHealthCheck;

private int voteRetries;

private long voteRetryWait;

public ReplicaPolicy(final NetworkHealthCheck networkHealthCheck) {
this.networkHealthCheck = networkHealthCheck;
}
Expand All @@ -72,14 +76,18 @@ public ReplicaPolicy(String clusterName,
ScaleDownPolicy scaleDownPolicy,
NetworkHealthCheck networkHealthCheck,
boolean voteOnReplicationFailure,
int quorumSize) {
int quorumSize,
int voteRetries,
long voteRetryWait) {
this.clusterName = clusterName;
this.maxSavedReplicatedJournalsSize = maxSavedReplicatedJournalsSize;
this.groupName = groupName;
this.restartBackup = restartBackup;
this.allowFailback = allowFailback;
this.initialReplicationSyncTimeout = initialReplicationSyncTimeout;
this.quorumSize = quorumSize;
this.voteRetries = voteRetries;
this.voteRetryWait = voteRetryWait;
this.scaleDownPolicy = scaleDownPolicy;
this.networkHealthCheck = networkHealthCheck;
this.voteOnReplicationFailure = voteOnReplicationFailure;
Expand Down Expand Up @@ -115,7 +123,7 @@ public void setMaxSavedReplicatedJournalsSize(int maxSavedReplicatedJournalsSize

public ReplicatedPolicy getReplicatedPolicy() {
if (replicatedPolicy == null) {
replicatedPolicy = new ReplicatedPolicy(false, allowFailback, initialReplicationSyncTimeout, groupName, clusterName, this, networkHealthCheck, voteOnReplicationFailure, quorumSize);
replicatedPolicy = new ReplicatedPolicy(false, allowFailback, initialReplicationSyncTimeout, groupName, clusterName, this, networkHealthCheck, voteOnReplicationFailure, quorumSize, voteRetries, voteRetryWait);
}
return replicatedPolicy;
}
Expand Down Expand Up @@ -210,4 +218,20 @@ public void setVoteOnReplicationFailure(boolean voteOnReplicationFailure) {
public boolean isVoteOnReplicationFailure() {
return voteOnReplicationFailure;
}

public void setVoteRetries(int voteRetries) {
this.voteRetries = voteRetries;
}

public void setVoteRetryWait(long voteRetryWait) {
this.voteRetryWait = voteRetryWait;
}

public int getVoteRetries() {
return voteRetries;
}

public long getVoteRetryWait() {
return voteRetryWait;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ public class ReplicatedPolicy implements HAPolicy<LiveActivation> {
* */
private int quorumSize;

private int voteRetries;

private long voteRetryWait;

/*
* this are only used as the policy when the server is started as a live after a failover
* */
Expand All @@ -68,14 +72,18 @@ public ReplicatedPolicy(boolean checkForLiveServer,
long initialReplicationSyncTimeout,
NetworkHealthCheck networkHealthCheck,
boolean voteOnReplicationFailure,
int quorumSize) {
int quorumSize,
int voteRetries,
long voteRetryWait) {
this.checkForLiveServer = checkForLiveServer;
this.groupName = groupName;
this.clusterName = clusterName;
this.initialReplicationSyncTimeout = initialReplicationSyncTimeout;
this.networkHealthCheck = networkHealthCheck;
this.voteOnReplicationFailure = voteOnReplicationFailure;
this.quorumSize = quorumSize;
this.voteRetries = voteRetries;
this.voteRetryWait = voteRetryWait;
}

public ReplicatedPolicy(boolean checkForLiveServer,
Expand All @@ -86,7 +94,9 @@ public ReplicatedPolicy(boolean checkForLiveServer,
ReplicaPolicy replicaPolicy,
NetworkHealthCheck networkHealthCheck,
boolean voteOnReplicationFailure,
int quorumSize) {
int quorumSize,
int voteRetries,
long voteRetryWait) {
this.checkForLiveServer = checkForLiveServer;
this.clusterName = clusterName;
this.groupName = groupName;
Expand Down Expand Up @@ -140,6 +150,8 @@ public ReplicaPolicy getReplicaPolicy() {
replicaPolicy = new ReplicaPolicy(networkHealthCheck, this);
replicaPolicy.setQuorumSize(quorumSize);
replicaPolicy.setVoteOnReplicationFailure(voteOnReplicationFailure);
replicaPolicy.setVoteRetries(voteRetries);
replicaPolicy.setVoteRetryWait(voteRetryWait);
if (clusterName != null && clusterName.length() > 0) {
replicaPolicy.setClusterName(clusterName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -778,7 +778,8 @@ public Transformer getTransformer() {

protected void fail(final boolean permanently) {
logger.debug(this + "\n\t::fail being called, permanently=" + permanently);

//we need to make sure we remove the node from the topology so any incoming quorum requests are voted correctly
serverLocator.notifyNodeDown(System.currentTimeMillis(), targetNodeID);
if (queue != null) {
try {
if (logger.isTraceEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,22 @@ public QuorumVoteHandler getVoteHandler(SimpleString handler) {
return handlers.get(handler);
}

public TransportConfiguration getLiveTransportConfiguration(String targetServerID) {
TopologyMemberImpl member = clusterController.getDefaultClusterTopology().getMember(targetServerID);
return member != null ? member.getLive() : null;
}

public boolean checkLive(TransportConfiguration liveTransportConfiguration) {
try {
ClusterControl control = clusterController.connectToNode(liveTransportConfiguration);
control.close();
return true;
} catch (Throwable t) {
return false;
}
}


private final class VoteRunnableHolder {

private final QuorumVote quorumVote;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.concurrent.TimeUnit;

import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.client.SessionFailureListener;
import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal;
import org.apache.activemq.artemis.core.client.impl.Topology;
Expand All @@ -33,6 +34,8 @@

public class SharedNothingBackupQuorum implements Quorum, SessionFailureListener {

private TransportConfiguration liveTransportConfiguration;

public enum BACKUP_ACTIVATION {
FAIL_OVER, FAILURE_REPLICATING, ALREADY_REPLICATING, STOP;
}
Expand All @@ -47,6 +50,12 @@ public enum BACKUP_ACTIVATION {
private final ScheduledExecutorService scheduledPool;
private final int quorumSize;

private final int voteRetries;

private final long voteRetryWait;

private final Object voteGuard = new Object();

private CountDownLatch latch;

private ClientSessionFactoryInternal sessionFactory;
Expand All @@ -68,13 +77,17 @@ public SharedNothingBackupQuorum(StorageManager storageManager,
NodeManager nodeManager,
ScheduledExecutorService scheduledPool,
NetworkHealthCheck networkHealthCheck,
int quorumSize) {
int quorumSize,
int voteRetries,
long voteRetryWait) {
this.storageManager = storageManager;
this.scheduledPool = scheduledPool;
this.quorumSize = quorumSize;
this.latch = new CountDownLatch(1);
this.nodeManager = nodeManager;
this.networkHealthCheck = networkHealthCheck;
this.voteRetries = voteRetries;
this.voteRetryWait = voteRetryWait;
}

private volatile BACKUP_ACTIVATION signal;
Expand Down Expand Up @@ -129,6 +142,7 @@ public void decideOnAction(Topology topology) {
public void liveIDSet(String liveID) {
targetServerID = liveID;
nodeManager.setNodeID(liveID);
liveTransportConfiguration = quorumManager.getLiveTransportConfiguration(targetServerID);
//now we are replicating we can start waiting for disconnect notifications so we can fail over
// sessionFactory.addFailureListener(this);
}
Expand Down Expand Up @@ -267,20 +281,44 @@ public synchronized void reset() {
* @return the voting decision
*/
private boolean isLiveDown() {
//lets assume live is not down
Boolean decision = false;
int voteAttempts = 0;
int size = quorumSize == -1 ? quorumManager.getMaxClusterSize() : quorumSize;

QuorumVoteServerConnect quorumVote = new QuorumVoteServerConnect(size, targetServerID);
synchronized (voteGuard) {
while (!decision && voteAttempts++ < voteRetries) {
// a quick check to see if the live actually is dead
if (quorumManager.checkLive(liveTransportConfiguration)) {
//the live is still alive so we best not failover
return false;
}
//the live is dead so lets vote for quorum
QuorumVoteServerConnect quorumVote = new QuorumVoteServerConnect(size, targetServerID);

quorumManager.vote(quorumVote);
quorumManager.vote(quorumVote);

try {
quorumVote.await(LATCH_TIMEOUT, TimeUnit.SECONDS);
} catch (InterruptedException interruption) {
// No-op. The best the quorum can do now is to return the latest number it has
}
try {
quorumVote.await(LATCH_TIMEOUT, TimeUnit.SECONDS);
} catch (InterruptedException interruption) {
// No-op. The best the quorum can do now is to return the latest number it has
}

quorumManager.voteComplete(quorumVote);

quorumManager.voteComplete(quorumVote);
decision = quorumVote.getDecision();

if (decision) {
return decision;
}
try {
voteGuard.wait(voteRetryWait);
} catch (InterruptedException e) {
//nothing to do here
}
}
}

return quorumVote.getDecision();
return decision;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ public void run() {
logger.trace("Entered a synchronized");
if (closed)
return;
backupQuorum = new SharedNothingBackupQuorum(activeMQServer.getStorageManager(), activeMQServer.getNodeManager(), activeMQServer.getScheduledPool(), networkHealthCheck, replicaPolicy.getQuorumSize());
backupQuorum = new SharedNothingBackupQuorum(activeMQServer.getStorageManager(), activeMQServer.getNodeManager(), activeMQServer.getScheduledPool(), networkHealthCheck, replicaPolicy.getQuorumSize(), replicaPolicy.getVoteRetries(), replicaPolicy.getVoteRetryWait());
activeMQServer.getClusterManager().getQuorumManager().registerQuorum(backupQuorum);
activeMQServer.getClusterManager().getQuorumManager().registerQuorumHandler(new ServerConnectVoteHandler(activeMQServer));
}
Expand Down
Loading

0 comments on commit 6067a28

Please sign in to comment.