Skip to content

Commit

Permalink
Enhance and stabilize the quorum elect command
Browse files Browse the repository at this point in the history
### Why are the changes needed?
The `quorum elect` command can be slow, hang, incorrectly display error
messages, and has many small shortcomings. This PR tackles those
problems.

### Does this PR introduce any user facing changes?
No

pr-link: #14008
change-id: cid-1181ae3ef6b47787e5412c15686d5f0f5d32ba25
  • Loading branch information
jenoudet committed Sep 8, 2021
1 parent 22ecb2c commit e79fcdd
Show file tree
Hide file tree
Showing 4 changed files with 149 additions and 46 deletions.
Expand Up @@ -170,6 +170,8 @@ public class RaftJournalSystem extends AbstractJournalSystem {
private final RaftJournalConfiguration mConf;
/** Controls whether state machine can take snapshots. */
private final AtomicBoolean mSnapshotAllowed;
/** Controls whether or not the quorum leadership can be transferred. */
private final AtomicBoolean mTransferLeaderAllowed;

private final Map<String, RatisDropwizardExports> mRatisMetricsMap =
new ConcurrentHashMap<>();
Expand Down Expand Up @@ -234,6 +236,7 @@ private RaftJournalSystem(RaftJournalConfiguration conf) {
mConf = processRaftConfiguration(conf);
mJournals = new ConcurrentHashMap<>();
mSnapshotAllowed = new AtomicBoolean(true);
mTransferLeaderAllowed = new AtomicBoolean(false);
mPrimarySelector = new RaftPrimarySelector();
mAsyncJournalWriter = new AtomicReference<>();
try {
Expand Down Expand Up @@ -479,6 +482,7 @@ public synchronized void gainPrimacy() {
mRaftJournalWriter = new RaftJournalWriter(nextSN, client);
mAsyncJournalWriter
.set(new AsyncJournalWriter(mRaftJournalWriter, () -> getJournalSinks(null)));
mTransferLeaderAllowed.set(true);
}

@Override
Expand All @@ -487,6 +491,7 @@ public synchronized void losePrimacy() {
// Avoid duplicate shut down Ratis server
return;
}
mTransferLeaderAllowed.set(false);
try {
// Close async writer first to flush pending entries.
mAsyncJournalWriter.get().close();
Expand Down Expand Up @@ -923,7 +928,7 @@ public synchronized void resetPriorities() throws IOException {
LOG.info("Resetting RaftPeer priorities");
try (RaftClient client = createClient()) {
RaftClientReply reply = client.admin().setConfiguration(resetPeers);
processReply(reply);
processReply(reply, "failed to reset master priorities to 1");
}
}

Expand All @@ -934,19 +939,26 @@ public synchronized void resetPriorities() throws IOException {
* @throws IOException if error occurred while performing the operation
*/
public synchronized void transferLeadership(NetAddress newLeaderNetAddress) throws IOException {
final boolean allowed = mTransferLeaderAllowed.getAndSet(false);
if (!allowed) {
throw new IOException("transfer is not allowed at the moment because the master is "
+ (mRaftJournalWriter == null ? "still gaining primacy" : "already transferring the "
+ "leadership"));
}
InetSocketAddress serverAddress = InetSocketAddress
.createUnresolved(newLeaderNetAddress.getHost(), newLeaderNetAddress.getRpcPort());
List<RaftPeer> oldPeers = new ArrayList<>(mRaftGroup.getPeers());
// The NetUtil function is used by Ratis to convert InetSocketAddress to string
String strAddr = NetUtils.address2String(serverAddress);
// if you cannot find the address in the quorum, throw exception.
if (oldPeers.stream().map(RaftPeer::getAddress).noneMatch(addr -> addr.equals(strAddr))) {
mTransferLeaderAllowed.set(true);
throw new IOException(String.format("<%s> is not part of the quorum <%s>.",
strAddr, oldPeers.stream().map(RaftPeer::getAddress).collect(Collectors.toList())));
}

RaftPeerId newLeaderPeerId = RaftJournalUtils.getPeerId(serverAddress);
// --- the change in priorities seems to be necessary otherwise the transfer fails ---
/* update priorities to enable transfer */
List<RaftPeer> peersWithNewPriorities = new ArrayList<>();
for (RaftPeer peer : oldPeers) {
peersWithNewPriorities.add(
Expand All @@ -955,15 +967,13 @@ public synchronized void transferLeadership(NetAddress newLeaderNetAddress) thro
.build()
);
}
// --- end of updating priorities ---
try (RaftClient client = createClient()) {
String stringPeers = "[" + peersWithNewPriorities.stream().map(RaftPeer::toString)
.collect(Collectors.joining(", ")) + "]";
LOG.info("Applying new peer state before transferring leadership: {}", stringPeers);
// set peers to have new priorities
RaftClientReply reply = client.admin().setConfiguration(peersWithNewPriorities);
processReply(reply);
// transfer leadership
processReply(reply, "failed to set master priorities before initiating election");
/* transfer leadership */
LOG.info("Transferring leadership to master with address <{}> and with RaftPeerId <{}>",
serverAddress, newLeaderPeerId);
// fire and forget: need to immediately return as the master will shut down its RPC servers
Expand All @@ -973,14 +983,20 @@ public synchronized void transferLeadership(NetAddress newLeaderNetAddress) thro
new Thread(() -> {
try {
Thread.sleep(SLEEP_TIME_MS);
client.admin().transferLeadership(newLeaderPeerId, TRANSFER_LEADER_WAIT_MS);
RaftClientReply reply1 = client.admin().transferLeadership(newLeaderPeerId,
TRANSFER_LEADER_WAIT_MS);
processReply(reply1, "election failed");
} catch (Throwable t) {
LOG.error("caught an error: {}", t.getMessage());
LOG.error("caught an error when executing transfer: {}", t.getMessage());
// we only allow transfers again if the transfer is unsuccessful: a success means it
// will soon lose primacy
mTransferLeaderAllowed.set(true);
/* checking the transfer happens in {@link QuorumElectCommand} */
}
}).start();
LOG.info("Transferring leadership initiated");
} catch (Throwable t) {
mTransferLeaderAllowed.set(true);
throw new IOException(t);
}
}
Expand All @@ -989,11 +1005,13 @@ public synchronized void transferLeadership(NetAddress newLeaderNetAddress) thro
* @param reply from the ratis operation
* @throws IOException
*/
private void processReply(RaftClientReply reply) throws IOException {
private void processReply(RaftClientReply reply, String msgToUser) throws IOException {
if (!reply.isSuccess()) {
throw reply.getException() != null
IOException ioe = reply.getException() != null
? reply.getException()
: new IOException(String.format("reply <%s> failed", reply));
LOG.error("{}. Error: {}", msgToUser, ioe);
throw new IOException(msgToUser);
}
}

Expand Down
Expand Up @@ -17,18 +17,21 @@
import alluxio.client.journal.JournalMasterClient;
import alluxio.conf.AlluxioConfiguration;
import alluxio.exception.ExceptionMessage;
import alluxio.exception.status.AlluxioStatusException;
import alluxio.exception.status.InvalidArgumentException;
import alluxio.exception.status.UnavailableException;
import alluxio.grpc.NetAddress;
import alluxio.master.MasterInquireClient;
import alluxio.util.CommonUtils;
import alluxio.util.WaitForOptions;

import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Options;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.concurrent.TimeoutException;

/**
* Command for transferring the leadership to another master within a quorum.
Expand All @@ -37,10 +40,8 @@ public class QuorumElectCommand extends AbstractFsAdminCommand {

public static final String ADDRESS_OPTION_NAME = "address";

public static final String TRANSFER_SUCCESS = "Transferred leadership to server: %s";
public static final String TRANSFER_FAILED = "Leadership was not transferred to %s: %s";
public static final String RESET_SUCCESS = "Quorum priorities were reset to 1";
public static final String RESET_FAILED = "Quorum priorities failed to be reset: %s";
public static final String TRANSFER_SUCCESS = "Successfully elected %s as the new leader";
public static final String TRANSFER_FAILED = "Failed to elect %s as the new leader: %s";

private final AlluxioConfiguration mConf;

Expand All @@ -66,39 +67,30 @@ public int run(CommandLine cl) throws IOException {
JournalMasterClient jmClient = mMasterJournalMasterClient;
String serverAddress = cl.getOptionValue(ADDRESS_OPTION_NAME);
NetAddress address = QuorumCommand.stringToAddress(serverAddress);

jmClient.transferLeadership(address);

MasterInquireClient inquireClient = MasterInquireClient.Factory
.create(mConf, FileSystemContext.create(mConf).getClientContext().getUserState());
boolean success = true;
// wait for confirmation of leadership transfer
try {
CommonUtils.waitFor("Waiting for leadership transfer to finalize", () -> {
InetSocketAddress leaderAddress;
jmClient.transferLeadership(address);
// wait for confirmation of leadership transfer
final int TIMEOUT_3MIN = 3 * 60 * 1000; // in milliseconds
CommonUtils.waitFor("Waiting for election to finalize", () -> {
try {
leaderAddress = inquireClient.getPrimaryRpcAddress();
InetSocketAddress leaderAddress = inquireClient.getPrimaryRpcAddress();
return leaderAddress.getHostName().equals(address.getHost());
} catch (UnavailableException e) {
return false;
}
return leaderAddress.getHostName().equals(address.getHost());
});
}, WaitForOptions.defaults().setTimeoutMs(TIMEOUT_3MIN));

mPrintStream.println(String.format(TRANSFER_SUCCESS, serverAddress));
} catch (Exception e) {
success = false;
mPrintStream.println(String.format(TRANSFER_FAILED, serverAddress, e));
}
// Resetting RaftPeer priorities using a separate RPC because the old leader has shut down
// its RPC server. We want to reset them regardless of transfer success because the original
// setting of priorities may have succeeded while the transfer might not have.
try {
jmClient.resetPriorities();
mPrintStream.println(RESET_SUCCESS);
} catch (Exception e) {
success = false;
mPrintStream.println(String.format(RESET_FAILED, e));
return 0;
} catch (AlluxioStatusException e) {
mPrintStream.println(String.format(TRANSFER_FAILED, serverAddress, e.getMessage()));
} catch (InterruptedException | TimeoutException e) {
mPrintStream.println(String.format(TRANSFER_FAILED, serverAddress, "the election was "
+ "initiated but never completed"));
}
return success ? 0 : -1;
return -1;
}

@Override
Expand Down
Expand Up @@ -196,7 +196,6 @@ public void elect() throws Exception {
.build();
mCluster.start();

String output;
try (FileSystemAdminShell shell = new FileSystemAdminShell(ServerConfiguration.global())) {
int newLeaderIdx = (mCluster.getPrimaryMasterIndex(MASTER_INDEX_WAIT_TIME) + 1) % numMasters;
// `getPrimaryMasterIndex` uses the same `mMasterAddresses` variable as getMasterAddresses
Expand All @@ -207,9 +206,8 @@ public void elect() throws Exception {

mOutput.reset();
shell.run("journal", "quorum", "elect", "-address" , newLeaderAddr);
output = mOutput.toString().trim();
String expected = String.format(QuorumElectCommand.TRANSFER_SUCCESS + "\n"
+ QuorumElectCommand.RESET_SUCCESS, newLeaderAddr);
String output = mOutput.toString().trim();
String expected = String.format(QuorumElectCommand.TRANSFER_SUCCESS, newLeaderAddr);
Assert.assertEquals(expected, output);
}
mCluster.notifySuccess();
Expand Down
Expand Up @@ -24,6 +24,7 @@
import alluxio.conf.ServerConfiguration;
import alluxio.exception.FileAlreadyExistsException;
import alluxio.exception.FileDoesNotExistException;
import alluxio.grpc.GetQuorumInfoPResponse;
import alluxio.grpc.NetAddress;
import alluxio.grpc.QuorumServerInfo;
import alluxio.grpc.QuorumServerState;
Expand Down Expand Up @@ -299,7 +300,46 @@ public void transferLeadership() throws Exception {
}

@Test
public void resetPriorities() throws Exception {
public void repeatedTransferLeadership() throws Exception {
final int MASTER_INDEX_WAIT_TIME = 5_000;
mCluster = MultiProcessCluster.newBuilder(PortCoordination.EMBEDDED_JOURNAL_FAILOVER)
.setClusterName("TransferLeadership")
.setNumMasters(NUM_MASTERS)
.setNumWorkers(0)
.addProperty(PropertyKey.MASTER_JOURNAL_TYPE, JournalType.EMBEDDED.toString())
.addProperty(PropertyKey.MASTER_JOURNAL_FLUSH_TIMEOUT_MS, "5min")
.addProperty(PropertyKey.MASTER_EMBEDDED_JOURNAL_MIN_ELECTION_TIMEOUT, "750ms")
.addProperty(PropertyKey.MASTER_EMBEDDED_JOURNAL_MAX_ELECTION_TIMEOUT, "1500ms")
.build();
mCluster.start();

for (int i = 0; i < NUM_MASTERS; i++) {
int newLeaderIdx = (mCluster.getPrimaryMasterIndex(MASTER_INDEX_WAIT_TIME) + 1) % NUM_MASTERS;
// `getPrimaryMasterIndex` uses the same `mMasterAddresses` variable as getMasterAddresses
// we can therefore access to the new leader's address this way
MasterNetAddress newLeaderAddr = mCluster.getMasterAddresses().get(newLeaderIdx);
NetAddress netAddress = NetAddress.newBuilder().setHost(newLeaderAddr.getHostname())
.setRpcPort(newLeaderAddr.getEmbeddedJournalPort()).build();

mCluster.getJournalMasterClientForMaster().transferLeadership(netAddress);

final int TIMEOUT_3MIN = 3 * 60 * 1000; // in ms
CommonUtils.waitFor("leadership to transfer", () -> {
try {
// wait until the address of the new leader matches the one designated as the new leader
return mCluster.getMasterAddresses()
.get(mCluster.getPrimaryMasterIndex(MASTER_INDEX_WAIT_TIME)).equals(newLeaderAddr);
} catch (Exception exc) {
throw new RuntimeException(exc);
}
}, WaitForOptions.defaults().setTimeoutMs(TIMEOUT_3MIN));
}
mCluster.notifySuccess();
}

@Test
public void ensureAutoResetPriorities() throws Exception {
final int MASTER_INDEX_WAIT_TIME = 5_000;
mCluster = MultiProcessCluster.newBuilder(PortCoordination.EMBEDDED_JOURNAL_FAILOVER)
.setClusterName("TransferLeadership")
.setNumMasters(NUM_MASTERS)
Expand All @@ -311,10 +351,65 @@ public void resetPriorities() throws Exception {
.build();
mCluster.start();

for (int i = 0; i < NUM_MASTERS; i++) {
int newLeaderIdx = (mCluster.getPrimaryMasterIndex(MASTER_INDEX_WAIT_TIME) + 1) % NUM_MASTERS;
// `getPrimaryMasterIndex` uses the same `mMasterAddresses` variable as getMasterAddresses
// we can therefore access to the new leader's address this way
MasterNetAddress newLeaderAddr = mCluster.getMasterAddresses().get(newLeaderIdx);
NetAddress netAddress = NetAddress.newBuilder().setHost(newLeaderAddr.getHostname())
.setRpcPort(newLeaderAddr.getEmbeddedJournalPort()).build();

mCluster.getJournalMasterClientForMaster().transferLeadership(netAddress);

final int TIMEOUT_3MIN = 3 * 60 * 1000; // in ms
CommonUtils.waitFor("leadership to transfer", () -> {
try {
// wait until the address of the new leader matches the one designated as the new leader
return mCluster.getMasterAddresses()
.get(mCluster.getPrimaryMasterIndex(MASTER_INDEX_WAIT_TIME)).equals(newLeaderAddr);
} catch (Exception exc) {
throw new RuntimeException(exc);
}
}, WaitForOptions.defaults().setTimeoutMs(TIMEOUT_3MIN));

GetQuorumInfoPResponse info = mCluster.getJournalMasterClientForMaster().getQuorumInfo();
// confirms that master priorities get reset to 0 for the new leading master and 1 for the
// follower masters (this behavior is default within Apache Ratis 2.0)
Assert.assertTrue(info.getServerInfoList().stream().allMatch(masterInfo ->
masterInfo.getPriority() == (masterInfo.getIsLeader() ? 0 : 1)
));
}
mCluster.notifySuccess();
}

@Test
public void transferLeadershipWhenAlreadyTransferring() throws Exception {
final int MASTER_INDEX_WAIT_TIME = 5_000;
mCluster = MultiProcessCluster.newBuilder(PortCoordination.EMBEDDED_JOURNAL_FAILOVER)
.setClusterName("TransferLeadership")
.setNumMasters(NUM_MASTERS)
.setNumWorkers(0)
.addProperty(PropertyKey.MASTER_JOURNAL_TYPE, JournalType.EMBEDDED.toString())
.addProperty(PropertyKey.MASTER_JOURNAL_FLUSH_TIMEOUT_MS, "5min")
.addProperty(PropertyKey.MASTER_EMBEDDED_JOURNAL_MIN_ELECTION_TIMEOUT, "750ms")
.addProperty(PropertyKey.MASTER_EMBEDDED_JOURNAL_MAX_ELECTION_TIMEOUT, "1500ms")
.build();
mCluster.start();

int newLeaderIdx = (mCluster.getPrimaryMasterIndex(MASTER_INDEX_WAIT_TIME) + 1) % NUM_MASTERS;
// `getPrimaryMasterIndex` uses the same `mMasterAddresses` variable as getMasterAddresses
// we can therefore access to the new leader's address this way
MasterNetAddress newLeaderAddr = mCluster.getMasterAddresses().get(newLeaderIdx);
NetAddress netAddress = NetAddress.newBuilder().setHost(newLeaderAddr.getHostname())
.setRpcPort(newLeaderAddr.getEmbeddedJournalPort()).build();

mCluster.getJournalMasterClientForMaster().transferLeadership(netAddress);
try {
mCluster.getJournalMasterClientForMaster().resetPriorities();
} catch (Exception e) {
throw new RuntimeException(e);
// this second call should throw an exception
mCluster.getJournalMasterClientForMaster().transferLeadership(netAddress);
Assert.fail("Should have thrown exception");
} catch (IOException ioe) {
// expected exception thrown
}
mCluster.notifySuccess();
}
Expand Down

0 comments on commit e79fcdd

Please sign in to comment.