Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@
package org.apache.hadoop.ozone.om.exceptions;

import java.io.IOException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.apache.ratis.protocol.exceptions.NotLeaderException;
import org.apache.ratis.protocol.RaftPeerId;
Expand All @@ -34,50 +32,38 @@ public class OMNotLeaderException extends IOException {

private final String currentPeerId;
private final String leaderPeerId;
private static final Pattern CURRENT_PEER_ID_PATTERN =
Pattern.compile("OM:(.*) is not the leader[.]+.*", Pattern.DOTALL);
private static final Pattern SUGGESTED_LEADER_PATTERN =
Pattern.compile(".*Suggested leader is OM:([^.]*).*", Pattern.DOTALL);
private final String leaderAddress;

public OMNotLeaderException(RaftPeerId currentPeerId) {
super("OM:" + currentPeerId + " is not the leader. Could not " +
"determine the leader node.");
this.currentPeerId = currentPeerId.toString();
this.leaderPeerId = null;
this.leaderAddress = null;
}

public OMNotLeaderException(RaftPeerId currentPeerId,
RaftPeerId suggestedLeaderPeerId) {
this(currentPeerId, suggestedLeaderPeerId, null);
}

public OMNotLeaderException(RaftPeerId currentPeerId,
RaftPeerId suggestedLeaderPeerId, String suggestedLeaderAddress) {
super("OM:" + currentPeerId + " is not the leader. Suggested leader is" +
" OM:" + suggestedLeaderPeerId + ".");
" OM:" + suggestedLeaderPeerId + "[" + suggestedLeaderAddress + "].");
this.currentPeerId = currentPeerId.toString();
this.leaderPeerId = suggestedLeaderPeerId.toString();
}

public OMNotLeaderException(String message) {
super(message);

Matcher currentLeaderMatcher = CURRENT_PEER_ID_PATTERN.matcher(message);
if (currentLeaderMatcher.matches()) {
this.currentPeerId = currentLeaderMatcher.group(1);

Matcher suggestedLeaderMatcher =
SUGGESTED_LEADER_PATTERN.matcher(message);
if (suggestedLeaderMatcher.matches()) {
this.leaderPeerId = suggestedLeaderMatcher.group(1);
} else {
this.leaderPeerId = null;
}
} else {
this.currentPeerId = null;
this.leaderPeerId = null;
}
this.leaderAddress = suggestedLeaderAddress;
}

public String getSuggestedLeaderNodeId() {
return leaderPeerId;
}

public String getSuggestedLeaderAddress() {
return leaderAddress;
}

/**
* Convert {@link NotLeaderException} to {@link OMNotLeaderException}.
* @param notLeaderException
Expand All @@ -89,10 +75,13 @@ public static OMNotLeaderException convertToOMNotLeaderException(
RaftPeerId suggestedLeader =
notLeaderException.getSuggestedLeader() != null ?
notLeaderException.getSuggestedLeader().getId() : null;
String suggestedLeaderAddress =
notLeaderException.getSuggestedLeader() != null ?
notLeaderException.getSuggestedLeader().getAddress() : null;
OMNotLeaderException omNotLeaderException;
if (suggestedLeader != null) {
omNotLeaderException = new OMNotLeaderException(currentPeer,
suggestedLeader);
suggestedLeader, suggestedLeaderAddress);
} else {
omNotLeaderException =
new OMNotLeaderException(currentPeer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.hadoop.hdds.HddsUtils;
import org.apache.hadoop.hdds.utils.LegacyHadoopConfigurationSource;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.ozone.OmUtils;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.ha.ConfUtils;
Expand Down Expand Up @@ -71,6 +72,7 @@ protected void loadOMClientConfigs(ConfigurationSource config, String omSvcId)
Collection<String> omNodeIds = OmUtils.getActiveOMNodeIds(config, omSvcId);
Map<String, ProxyInfo<T>> omProxies = new HashMap<>();
List<String> omNodeIDList = new ArrayList<>();
Map<String, InetSocketAddress> omNodeAddressMap = new HashMap<>();

for (String nodeId : OmUtils.emptyAsSingletonNull(omNodeIds)) {
String rpcAddrKey = ConfUtils.addKeySuffixes(OZONE_OM_ADDRESS_KEY,
Expand All @@ -85,14 +87,16 @@ protected void loadOMClientConfigs(ConfigurationSource config, String omSvcId)
nodeId = OzoneConsts.OM_DEFAULT_NODE_ID;
}
if (hostaddr.isPresent()) {
int port = hostport.orElse(config
.getObject(GrpcOmTransport
.GrpcOmTransportConfig.class)
.getPort());
ProxyInfo<T> proxyInfo =
new ProxyInfo<>(createOMProxy(),
hostaddr.get() + ":"
+ hostport.orElse(config
.getObject(GrpcOmTransport
.GrpcOmTransportConfig.class)
.getPort()));
hostaddr.get() + ":" + port);
omProxies.put(nodeId, proxyInfo);
omNodeAddressMap.put(nodeId,
NetUtils.createSocketAddr(proxyInfo.proxyInfo));
} else {
LOG.error("expected host address not defined for: {}", rpcAddrKey);
throw new ConfigurationException(rpcAddrKey + "is not defined");
Expand All @@ -107,6 +111,7 @@ protected void loadOMClientConfigs(ConfigurationSource config, String omSvcId)
}
setOmProxies(omProxies);
setOmNodeIDList(omNodeIDList);
setOmNodeAddressMap(omNodeAddressMap);
}

private T createOMProxy() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ protected void loadOMClientConfigs(ConfigurationSource config, String omSvcId)
Map<String, ProxyInfo<T>> omProxies = new HashMap<>();
this.omProxyInfos = new HashMap<>();
List<String> omNodeIDList = new ArrayList<>();
Map<String, InetSocketAddress> omNodeAddressMap = new HashMap<>();

Collection<String> omNodeIds = OmUtils.getActiveOMNodeIds(config,
omSvcId);
Expand All @@ -112,6 +113,7 @@ protected void loadOMClientConfigs(ConfigurationSource config, String omSvcId)
omProxies.put(nodeId, null);
omProxyInfos.put(nodeId, omProxyInfo);
omNodeIDList.add(nodeId);
omNodeAddressMap.put(nodeId, omProxyInfo.getAddress());
} else {
LOG.error("Failed to create OM proxy for {} at address {}",
nodeId, rpcAddrStr);
Expand All @@ -125,6 +127,7 @@ protected void loadOMClientConfigs(ConfigurationSource config, String omSvcId)
}
setOmProxies(omProxies);
setOmNodeIDList(omNodeIDList);
setOmNodeAddressMap(omNodeAddressMap);
}

private T createOMProxy(InetSocketAddress omAddress) throws IOException {
Expand Down Expand Up @@ -226,14 +229,21 @@ public List<OMProxyInfo> getOMProxyInfos() {
return new ArrayList<OMProxyInfo>(omProxyInfos.values());
}

@VisibleForTesting
public Map<String, OMProxyInfo> getOMProxyInfoMap() {
return omProxyInfos;
}

@VisibleForTesting
protected void setProxiesForTesting(
Map<String, ProxyInfo<T>> setOMProxies,
Map<String, OMProxyInfo> setOMProxyInfos,
List<String> setOMNodeIDList) {
List<String> setOMNodeIDList,
Map<String, InetSocketAddress> setOMNodeAddress) {
setOmProxies(setOMProxies);
this.omProxyInfos = setOMProxyInfos;
setOmNodeIDList(setOMNodeIDList);
setOmNodeAddressMap(setOMNodeAddress);
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@

import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
Expand All @@ -65,6 +66,7 @@ public abstract class OMFailoverProxyProviderBase<T> implements
// Map of OMNodeID to its proxy
private Map<String, ProxyInfo<T>> omProxies;
private List<String> omNodeIDList;
private Map<String, InetSocketAddress> omNodeAddressMap;

private String currentProxyOMNodeId;
private int currentProxyIndex;
Expand Down Expand Up @@ -98,6 +100,7 @@ public OMFailoverProxyProviderBase(ConfigurationSource configuration,
loadOMClientConfigs(conf, omServiceId);
Preconditions.checkNotNull(omProxies);
Preconditions.checkNotNull(omNodeIDList);
Preconditions.checkNotNull(omNodeAddressMap);

nextProxyIndex = 0;
nextProxyOMNodeId = omNodeIDList.get(nextProxyIndex);
Expand Down Expand Up @@ -172,12 +175,22 @@ public RetryAction shouldRetry(Exception exception, int retries,
OMNotLeaderException notLeaderException =
getNotLeaderException(exception);
if (notLeaderException != null) {
// TODO: NotLeaderException should include the host
// address of the suggested leader along with the nodeID.
// Failing over just based on nodeID is not very robust.

// Prepare the next OM to be tried. This will help with calculation
// of the wait times needed get creating the retryAction.
String suggestedLeaderAddress =
notLeaderException.getSuggestedLeaderAddress();
String suggestedNodeId =
notLeaderException.getSuggestedLeaderNodeId();
if (suggestedLeaderAddress != null &&
suggestedNodeId != null &&
omNodeAddressMap.containsKey(suggestedNodeId) &&
omNodeAddressMap.get(suggestedNodeId).toString()
.equals(suggestedLeaderAddress)) {
setNextOmProxy(suggestedNodeId);
return getRetryAction(RetryDecision.FAILOVER_AND_RETRY,
failovers);
}

selectNextOmProxy();
return getRetryAction(RetryDecision.FAILOVER_AND_RETRY, failovers);
}
Expand Down Expand Up @@ -405,4 +418,9 @@ protected synchronized List<String> getOmNodeIDList() {
return omNodeIDList;
}

protected synchronized void setOmNodeAddressMap(
Map<String, InetSocketAddress> map) {
this.omNodeAddressMap = map;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.hadoop.hdds.protocol.StorageType;
import org.apache.hadoop.hdfs.LogVerificationAppender;
import org.apache.hadoop.ozone.ClientVersion;
import org.apache.hadoop.ozone.OzoneTestUtils;
import org.apache.hadoop.ozone.client.ObjectStore;
import org.apache.hadoop.ozone.client.OzoneBucket;
Expand Down Expand Up @@ -47,6 +48,7 @@
import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.server.RaftServer;
import org.junit.Assert;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import javax.management.MBeanInfo;
Expand Down Expand Up @@ -283,6 +285,53 @@ public void testOMRetryProxy() throws Exception {
}
}

/**
* Choose a follower to send the request, the returned exception should
* include the suggested leader node.
*/
@Test
public void testFailoverWithSuggestedLeader() throws Exception {
HadoopRpcOMFailoverProxyProvider omFailoverProxyProvider =
OmFailoverProxyUtil
.getFailoverProxyProvider(getObjectStore().getClientProxy());

// Make sure All OMs are ready.
createVolumeTest(true);

// The OMFailoverProxyProvider will point to the current leader OM node.
String leaderOMNodeId = omFailoverProxyProvider.getCurrentProxyOMNodeId();
String leaderOMAddress = ((OMProxyInfo)
omFailoverProxyProvider.getOMProxyInfoMap().get(leaderOMNodeId))
.getAddress().getAddress().toString();
OzoneManager followerOM = null;
for (OzoneManager om: getCluster().getOzoneManagersList()) {
if (!om.isLeaderReady()) {
followerOM = om;
break;
}
}
assert followerOM != null;
Assertions.assertSame(followerOM.getOmRatisServer().checkLeaderStatus(),
OzoneManagerRatisServer.RaftServerStatus.NOT_LEADER);

OzoneManagerProtocolProtos.OMRequest writeRequest =
OzoneManagerProtocolProtos.OMRequest.newBuilder()
.setCmdType(OzoneManagerProtocolProtos.Type.ListVolume)
.setVersion(ClientVersion.CURRENT_VERSION)
.setClientId(UUID.randomUUID().toString())
.build();

try {
OzoneManagerProtocolProtos.OMResponse
omResponse = followerOM.getOmServerProtocol()
.submitRequest(null, writeRequest);
Assertions.fail("Test failure with NotLeaderException");
} catch (Exception ex) {
GenericTestUtils.assertExceptionContains("Suggested leader is OM:" +
leaderOMNodeId + "[" + leaderOMAddress + "]", ex);
}
}

@Test
@Flaky("HDDS-6644")
public void testReadRequest() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,4 +62,5 @@ public void testIncrementalWaitTimeWithSameNodeFailover() throws Exception {
Assert.assertEquals((numTimesTriedToSameNode + 1) * waitBetweenRetries,
omFailoverProxyProvider.getWaitTime());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -3891,6 +3891,8 @@ public void checkLeaderStatus() throws OMNotLeaderException,
OzoneManagerRatisServer.RaftServerStatus raftServerStatus =
omRatisServer.checkLeaderStatus();
RaftPeerId raftPeerId = omRatisServer.getRaftPeerId();
RaftPeerId raftLeaderId = omRatisServer.getRaftLeaderId();
String raftLeaderAddress = omRatisServer.getRaftLeaderAddress();

switch (raftServerStatus) {
case LEADER_AND_READY: return;
Expand All @@ -3899,9 +3901,9 @@ public void checkLeaderStatus() throws OMNotLeaderException,
raftPeerId.toString() + " is Leader " +
"but not ready to process request yet.");
case NOT_LEADER:
// TODO: Set suggest leaderID. Right now, client is not using suggest
// leaderID. Need to fix this.
throw new OMNotLeaderException(raftPeerId);
throw raftLeaderId == null ? new OMNotLeaderException(raftPeerId) :
new OMNotLeaderException(raftPeerId, raftLeaderId,
raftLeaderAddress);
default: throw new IllegalStateException(
"Unknown Ratis Server state: " + raftServerStatus);
}
Expand Down
Loading