Skip to content

Commit

Permalink
HDDS-1098. Introduce Retry Policy in Ozone Client. Contributed by Sha…
Browse files Browse the repository at this point in the history
…shikant Banerjee.
  • Loading branch information
bshashikant committed Mar 15, 2019
1 parent ba50a36 commit 155ab6d
Show file tree
Hide file tree
Showing 13 changed files with 206 additions and 105 deletions.
Expand Up @@ -47,6 +47,7 @@
import org.apache.ratis.rpc.RpcType;
import org.apache.ratis.rpc.SupportedRpcType;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.ratis.util.TimeDuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -74,14 +75,16 @@ public static XceiverClientRatis newXceiverClientRatis(
final String rpcType = ozoneConf
.get(ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_KEY,
ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT);
final TimeDuration clientRequestTimeout =
RatisHelper.getClientRequestTimeout(ozoneConf);
final int maxOutstandingRequests =
HddsClientUtils.getMaxOutstandingRequests(ozoneConf);
final RetryPolicy retryPolicy = RatisHelper.createRetryPolicy(ozoneConf);
final GrpcTlsConfig tlsConfig = RatisHelper.createTlsClientConfig(new
SecurityConfig(ozoneConf));
return new XceiverClientRatis(pipeline,
SupportedRpcType.valueOfIgnoreCase(rpcType), maxOutstandingRequests,
retryPolicy, tlsConfig);
retryPolicy, tlsConfig, clientRequestTimeout);
}

private final Pipeline pipeline;
Expand All @@ -90,6 +93,7 @@ public static XceiverClientRatis newXceiverClientRatis(
private final int maxOutstandingRequests;
private final RetryPolicy retryPolicy;
private final GrpcTlsConfig tlsConfig;
private final TimeDuration clientRequestTimeout;

// Map to track commit index at every server
private final ConcurrentHashMap<UUID, Long> commitInfoMap;
Expand All @@ -102,7 +106,7 @@ public static XceiverClientRatis newXceiverClientRatis(
*/
private XceiverClientRatis(Pipeline pipeline, RpcType rpcType,
int maxOutStandingChunks, RetryPolicy retryPolicy,
GrpcTlsConfig tlsConfig) {
GrpcTlsConfig tlsConfig, TimeDuration timeout) {
super();
this.pipeline = pipeline;
this.rpcType = rpcType;
Expand All @@ -111,6 +115,7 @@ private XceiverClientRatis(Pipeline pipeline, RpcType rpcType,
commitInfoMap = new ConcurrentHashMap<>();
watchClient = null;
this.tlsConfig = tlsConfig;
this.clientRequestTimeout = timeout;
}

private void updateCommitInfosMap(
Expand Down Expand Up @@ -160,7 +165,7 @@ public void connect() throws Exception {
// requests to be handled by raft client
if (!client.compareAndSet(null,
RatisHelper.newRaftClient(rpcType, getPipeline(), retryPolicy,
maxOutstandingRequests, tlsConfig))) {
maxOutstandingRequests, tlsConfig, clientRequestTimeout))) {
throw new IllegalStateException("Client is already connected.");
}
}
Expand Down Expand Up @@ -243,7 +248,7 @@ public XceiverClientReply watchForCommit(long index, long timeout)
if (watchClient == null) {
watchClient =
RatisHelper.newRaftClient(rpcType, getPipeline(), retryPolicy,
maxOutstandingRequests, tlsConfig);
maxOutstandingRequests, tlsConfig, clientRequestTimeout);
}
CompletableFuture<RaftClientReply> replyFuture = watchClient
.sendWatchAsync(index, RaftProtos.ReplicationLevel.ALL_COMMITTED);
Expand All @@ -260,9 +265,9 @@ public XceiverClientReply watchForCommit(long index, long timeout)
// TODO : need to remove the code to create the new RaftClient instance
// here once the watch request bypassing sliding window in Raft Client
// gets fixed.
watchClient = RatisHelper
.newRaftClient(rpcType, getPipeline(), retryPolicy,
maxOutstandingRequests, tlsConfig);
watchClient =
RatisHelper.newRaftClient(rpcType, getPipeline(), retryPolicy,
maxOutstandingRequests, tlsConfig, clientRequestTimeout);
reply = watchClient
.sendWatchAsync(index, RaftProtos.ReplicationLevel.MAJORITY_COMMITTED)
.get(timeout, TimeUnit.MILLISECONDS);
Expand Down
Expand Up @@ -121,12 +121,12 @@ public final class ScmConfigKeys {
TimeDuration.valueOf(3000, TimeUnit.MILLISECONDS);
public static final String DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY =
"dfs.ratis.client.request.max.retries";
public static final int DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_DEFAULT = 180;
public static final int DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_DEFAULT = 20;
public static final String DFS_RATIS_CLIENT_REQUEST_RETRY_INTERVAL_KEY =
"dfs.ratis.client.request.retry.interval";
public static final TimeDuration
DFS_RATIS_CLIENT_REQUEST_RETRY_INTERVAL_DEFAULT =
TimeDuration.valueOf(100, TimeUnit.MILLISECONDS);
TimeDuration.valueOf(500, TimeUnit.MILLISECONDS);
public static final String DFS_RATIS_SERVER_RETRY_CACHE_TIMEOUT_DURATION_KEY =
"dfs.ratis.server.retry-cache.timeout.duration";
public static final TimeDuration
Expand Down
Expand Up @@ -133,6 +133,11 @@ public final class OzoneConfigKeys {
public static final String OZONE_CLIENT_WATCH_REQUEST_TIMEOUT_DEFAULT =
"30s";

public static final String OZONE_CLIENT_MAX_RETRIES =
"ozone.client.max.retries";
public static final int OZONE_CLIENT_MAX_RETRIES_DEFAULT = 5;


// This defines the overall connection limit for the connection pool used in
// RestClient.
public static final String OZONE_REST_CLIENT_HTTP_CONNECTION_MAX =
Expand Down
33 changes: 26 additions & 7 deletions hadoop-hdds/common/src/main/java/org/apache/ratis/RatisHelper.java
Expand Up @@ -36,6 +36,7 @@
import org.apache.hadoop.ozone.OzoneConsts;

import org.apache.ratis.client.RaftClient;
import org.apache.ratis.client.RaftClientConfigKeys;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.grpc.GrpcConfigKeys;
import org.apache.ratis.grpc.GrpcFactory;
Expand Down Expand Up @@ -134,33 +135,51 @@ static RaftGroup newRaftGroup(Pipeline pipeline) {

static RaftClient newRaftClient(RpcType rpcType, Pipeline pipeline,
RetryPolicy retryPolicy, int maxOutStandingRequest,
GrpcTlsConfig tlsConfig) throws IOException {
GrpcTlsConfig tlsConfig, TimeDuration timeout) throws IOException {
return newRaftClient(rpcType, toRaftPeerId(pipeline.getFirstNode()),
newRaftGroup(RaftGroupId.valueOf(pipeline.getId().getId()),
pipeline.getNodes()), retryPolicy, maxOutStandingRequest, tlsConfig);
pipeline.getNodes()), retryPolicy, maxOutStandingRequest, tlsConfig,
timeout);
}

static TimeDuration getClientRequestTimeout(Configuration conf) {
// Set the client requestTimeout
final TimeUnit timeUnit =
OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_TIMEOUT_DURATION_DEFAULT
.getUnit();
final long duration = conf.getTimeDuration(
OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_TIMEOUT_DURATION_KEY,
OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_TIMEOUT_DURATION_DEFAULT
.getDuration(), timeUnit);
final TimeDuration clientRequestTimeout =
TimeDuration.valueOf(duration, timeUnit);
return clientRequestTimeout;
}

static RaftClient newRaftClient(RpcType rpcType, RaftPeer leader,
RetryPolicy retryPolicy, int maxOutstandingRequests,
GrpcTlsConfig tlsConfig) {
GrpcTlsConfig tlsConfig, TimeDuration clientRequestTimeout) {
return newRaftClient(rpcType, leader.getId(),
newRaftGroup(new ArrayList<>(Arrays.asList(leader))), retryPolicy,
maxOutstandingRequests, tlsConfig);
maxOutstandingRequests, tlsConfig, clientRequestTimeout);
}

static RaftClient newRaftClient(RpcType rpcType, RaftPeer leader,
RetryPolicy retryPolicy, int maxOutstandingRequests) {
RetryPolicy retryPolicy, int maxOutstandingRequests,
TimeDuration clientRequestTimeout) {
return newRaftClient(rpcType, leader.getId(),
newRaftGroup(new ArrayList<>(Arrays.asList(leader))), retryPolicy,
maxOutstandingRequests, null);
maxOutstandingRequests, null, clientRequestTimeout);
}

static RaftClient newRaftClient(RpcType rpcType, RaftPeerId leader,
RaftGroup group, RetryPolicy retryPolicy, int maxOutStandingRequest,
GrpcTlsConfig tlsConfig) {
GrpcTlsConfig tlsConfig, TimeDuration clientRequestTimeout) {
LOG.trace("newRaftClient: {}, leader={}, group={}", rpcType, leader, group);
final RaftProperties properties = new RaftProperties();
RaftConfigKeys.Rpc.setType(properties, rpcType);
RaftClientConfigKeys.Rpc
.setRequestTimeout(properties, clientRequestTimeout);

GrpcConfigKeys.setMessageSizeMax(properties,
SizeInBytes.valueOf(OzoneConsts.OZONE_SCM_CHUNK_MAX_SIZE));
Expand Down
16 changes: 13 additions & 3 deletions hadoop-hdds/common/src/main/resources/ozone-default.xml
Expand Up @@ -231,17 +231,19 @@
<name>dfs.ratis.client.request.timeout.duration</name>
<value>3s</value>
<tag>OZONE, RATIS, MANAGEMENT</tag>
<description>The timeout duration for ratis client request.</description>
<description>The timeout duration for ratis client request.It should be
set greater than leader election timeout in Ratis.
</description>
</property>
<property>
<name>dfs.ratis.client.request.max.retries</name>
<value>180</value>
<value>20</value>
<tag>OZONE, RATIS, MANAGEMENT</tag>
<description>Number of retries for ratis client request.</description>
</property>
<property>
<name>dfs.ratis.client.request.retry.interval</name>
<value>100ms</value>
<value>500ms</value>
<tag>OZONE, RATIS, MANAGEMENT</tag>
<description>Interval between successive retries for a ratis client request.
</description>
Expand Down Expand Up @@ -417,6 +419,14 @@
a particular request getting replayed to all servers.
</description>
</property>
<property>
<name>ozone.client.max.retries</name>
<value>5</value>
<tag>OZONE, CLIENT</tag>
<description>Maximum number of retries by Ozone Client on encountering
exception while writing a key.
</description>
</property>
<property>
<name>ozone.client.protocol</name>
<value>org.apache.hadoop.ozone.client.rpc.RpcClient</value>
Expand Down
Expand Up @@ -44,7 +44,6 @@
import io.opentracing.Scope;
import org.apache.ratis.RaftConfigKeys;
import org.apache.ratis.RatisHelper;
import org.apache.ratis.client.RaftClientConfigKeys;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.grpc.GrpcConfigKeys;
import org.apache.ratis.grpc.GrpcFactory;
Expand Down Expand Up @@ -176,7 +175,7 @@ private RaftProperties newRaftProperties(Configuration conf) {
setRaftSegmentPreallocatedSize(conf, properties);

// Set max write buffer size, which is the scm chunk size
final int maxChunkSize = setMaxWriteBuffer(conf, properties);
final int maxChunkSize = setMaxWriteBuffer(properties);
TimeUnit timeUnit;
long duration;

Expand Down Expand Up @@ -329,23 +328,10 @@ private void setServerRequestTimeout(Configuration conf,
.setRequestTimeout(properties, serverRequestTimeout);
}

private int setMaxWriteBuffer(Configuration conf, RaftProperties properties) {
private int setMaxWriteBuffer(RaftProperties properties) {
final int maxChunkSize = OzoneConsts.OZONE_SCM_CHUNK_MAX_SIZE;
RaftServerConfigKeys.Log.setWriteBufferSize(properties,
SizeInBytes.valueOf(maxChunkSize));

// Set the client requestTimeout
TimeUnit timeUnit =
OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_TIMEOUT_DURATION_DEFAULT
.getUnit();
long duration = conf.getTimeDuration(
OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_TIMEOUT_DURATION_KEY,
OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_TIMEOUT_DURATION_DEFAULT
.getDuration(), timeUnit);
final TimeDuration clientRequestTimeout =
TimeDuration.valueOf(duration, timeUnit);
RaftClientConfigKeys.Rpc
.setRequestTimeout(properties, clientRequestTimeout);
return maxChunkSize;
}

Expand Down
Expand Up @@ -39,6 +39,7 @@
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.retry.RetryPolicy;
import org.apache.ratis.rpc.SupportedRpcType;
import org.apache.ratis.util.TimeDuration;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Test;
Expand All @@ -49,6 +50,7 @@
import java.util.Collections;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

/**
* Test cases to verify CloseContainerCommandHandler in datanode.
Expand Down Expand Up @@ -289,8 +291,10 @@ private Container createContainer(final Configuration conf,
final RaftGroup group = RatisHelper.newRaftGroup(raftGroupId,
Collections.singleton(datanodeDetails));
final int maxOutstandingRequests = 100;
final RaftClient client = RatisHelper.newRaftClient(SupportedRpcType.GRPC,
peer, retryPolicy, maxOutstandingRequests, null);
final RaftClient client = RatisHelper
.newRaftClient(SupportedRpcType.GRPC, peer, retryPolicy,
maxOutstandingRequests,
TimeDuration.valueOf(3, TimeUnit.SECONDS));
Assert.assertTrue(client.groupAdd(group, peer.getId()).isSuccess());
Thread.sleep(2000);
final ContainerID containerId = ContainerID.valueof(
Expand Down
Expand Up @@ -32,6 +32,7 @@
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.retry.RetryPolicy;
import org.apache.ratis.rpc.SupportedRpcType;
import org.apache.ratis.util.TimeDuration;
import org.apache.ratis.util.function.CheckedBiConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -116,9 +117,11 @@ static void destroyPipeline(DatanodeDetails dn, PipelineID pipelineID,
HddsClientUtils.getMaxOutstandingRequests(ozoneConf);
final GrpcTlsConfig tlsConfig = RatisHelper.createTlsClientConfig(
new SecurityConfig(ozoneConf));
final TimeDuration requestTimeout =
RatisHelper.getClientRequestTimeout(ozoneConf);
RaftClient client = RatisHelper
.newRaftClient(SupportedRpcType.valueOfIgnoreCase(rpcType), p,
retryPolicy, maxOutstandingRequests, tlsConfig);
retryPolicy, maxOutstandingRequests, tlsConfig, requestTimeout);
client
.groupRemove(RaftGroupId.valueOf(pipelineID.getId()), true, p.getId());
}
Expand All @@ -141,12 +144,13 @@ private static void callRatisRpc(List<DatanodeDetails> datanodes,
HddsClientUtils.getMaxOutstandingRequests(ozoneConf);
final GrpcTlsConfig tlsConfig = RatisHelper.createTlsClientConfig(new
SecurityConfig(ozoneConf));

final TimeDuration requestTimeout =
RatisHelper.getClientRequestTimeout(ozoneConf);
datanodes.parallelStream().forEach(d -> {
final RaftPeer p = RatisHelper.toRaftPeer(d);
try (RaftClient client = RatisHelper
.newRaftClient(SupportedRpcType.valueOfIgnoreCase(rpcType), p,
retryPolicy, maxOutstandingRequests, tlsConfig)) {
retryPolicy, maxOutstandingRequests, tlsConfig, requestTimeout)) {
rpc.accept(client, p);
} catch (IOException ioe) {
String errMsg =
Expand Down
Expand Up @@ -20,13 +20,16 @@
import org.apache.hadoop.hdds.client.OzoneQuota;
import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException;
import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.client.rest.response.*;
import org.apache.ratis.protocol.AlreadyClosedException;
import org.apache.ratis.protocol.RaftRetryFailureException;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

/** A utility class for OzoneClient. */
Expand Down Expand Up @@ -122,6 +125,14 @@ public static KeyInfoDetails asKeyInfoDetails(OzoneKeyDetails key) {
return keyInfo;
}

public static RetryPolicy createRetryPolicy(int maxRetryCount) {
// just retry without sleep
RetryPolicy retryPolicy = RetryPolicies
.retryUpToMaximumCountWithFixedSleep(maxRetryCount, 0,
TimeUnit.MILLISECONDS);
return retryPolicy;
}

public static List<Class<? extends Exception>> getExceptionList() {
return EXCEPTION_LIST;
}
Expand Down

0 comments on commit 155ab6d

Please sign in to comment.