Skip to content

Commit

Permalink
HDDS-912. Update ozone to latest ratis snapshot build (0.4.0-3b0be02-…
Browse files Browse the repository at this point in the history
…SNAPSHOT). Contributed by Lokesh Jain.
  • Loading branch information
bshashikant committed Dec 18, 2018
1 parent 4ff1c46 commit b51e9e4
Show file tree
Hide file tree
Showing 15 changed files with 145 additions and 42 deletions.
Expand Up @@ -78,7 +78,7 @@ public final class ScmConfigKeys {
public static final String DFS_CONTAINER_RATIS_SEGMENT_PREALLOCATED_SIZE_KEY =
"dfs.container.ratis.segment.preallocated.size";
public static final String
DFS_CONTAINER_RATIS_SEGMENT_PREALLOCATED_SIZE_DEFAULT = "128MB";
DFS_CONTAINER_RATIS_SEGMENT_PREALLOCATED_SIZE_DEFAULT = "16KB";
public static final String
DFS_CONTAINER_RATIS_STATEMACHINEDATA_SYNC_TIMEOUT =
"dfs.container.ratis.statemachinedata.sync.timeout";
Expand All @@ -90,10 +90,23 @@ public final class ScmConfigKeys {
"dfs.container.ratis.statemachinedata.sync.retries";
public static final int
DFS_CONTAINER_RATIS_STATEMACHINEDATA_SYNC_RETRIES_DEFAULT = -1;
public static final String DFS_CONTAINER_RATIS_LOG_QUEUE_SIZE =
"dfs.container.ratis.log.queue.size";
public static final int DFS_CONTAINER_RATIS_LOG_QUEUE_SIZE_DEFAULT = 128;

public static final String DFS_CONTAINER_RATIS_LOG_QUEUE_NUM_ELEMENTS =
"dfs.container.ratis.log.queue.num-elements";
public static final int DFS_CONTAINER_RATIS_LOG_QUEUE_NUM_ELEMENTS_DEFAULT =
1024;
public static final String DFS_CONTAINER_RATIS_LOG_QUEUE_BYTE_LIMIT =
"dfs.container.ratis.log.queue.byte-limit";
public static final String DFS_CONTAINER_RATIS_LOG_QUEUE_BYTE_LIMIT_DEFAULT =
"4GB";
public static final String
DFS_CONTAINER_RATIS_LOG_APPENDER_QUEUE_NUM_ELEMENTS =
"dfs.container.ratis.log.appender.queue.num-elements";
public static final int
DFS_CONTAINER_RATIS_LOG_APPENDER_QUEUE_NUM_ELEMENTS_DEFAULT = 1;
public static final String DFS_CONTAINER_RATIS_LOG_APPENDER_QUEUE_BYTE_LIMIT =
"dfs.container.ratis.log.appender.queue.byte-limit";
public static final String
DFS_CONTAINER_RATIS_LOG_APPENDER_QUEUE_BYTE_LIMIT_DEFAULT = "32MB";
// expiry interval stateMachineData cache entry inside containerStateMachine
public static final String
DFS_CONTAINER_RATIS_STATEMACHINEDATA_CACHE_EXPIRY_INTERVAL =
Expand Down
Expand Up @@ -288,10 +288,25 @@ public final class OzoneConfigKeys {
public static final int
DFS_CONTAINER_RATIS_STATEMACHINEDATA_SYNC_RETRIES_DEFAULT =
ScmConfigKeys.DFS_CONTAINER_RATIS_STATEMACHINEDATA_SYNC_RETRIES_DEFAULT;
public static final String DFS_CONTAINER_RATIS_LOG_QUEUE_SIZE =
ScmConfigKeys.DFS_CONTAINER_RATIS_LOG_QUEUE_SIZE;
public static final int DFS_CONTAINER_RATIS_LOG_QUEUE_SIZE_DEFAULT =
ScmConfigKeys.DFS_CONTAINER_RATIS_LOG_QUEUE_SIZE_DEFAULT;
public static final String DFS_CONTAINER_RATIS_LOG_QUEUE_NUM_ELEMENTS =
ScmConfigKeys.DFS_CONTAINER_RATIS_LOG_QUEUE_NUM_ELEMENTS;
public static final int DFS_CONTAINER_RATIS_LOG_QUEUE_NUM_ELEMENTS_DEFAULT =
ScmConfigKeys.DFS_CONTAINER_RATIS_LOG_QUEUE_NUM_ELEMENTS_DEFAULT;
public static final String DFS_CONTAINER_RATIS_LOG_QUEUE_BYTE_LIMIT =
ScmConfigKeys.DFS_CONTAINER_RATIS_LOG_QUEUE_BYTE_LIMIT;
public static final String DFS_CONTAINER_RATIS_LOG_QUEUE_BYTE_LIMIT_DEFAULT =
ScmConfigKeys.DFS_CONTAINER_RATIS_LOG_QUEUE_BYTE_LIMIT_DEFAULT;
public static final String
DFS_CONTAINER_RATIS_LOG_APPENDER_QUEUE_NUM_ELEMENTS =
ScmConfigKeys.DFS_CONTAINER_RATIS_LOG_APPENDER_QUEUE_NUM_ELEMENTS;
public static final int
DFS_CONTAINER_RATIS_LOG_APPENDER_QUEUE_NUM_ELEMENTS_DEFAULT =
ScmConfigKeys.DFS_CONTAINER_RATIS_LOG_APPENDER_QUEUE_NUM_ELEMENTS_DEFAULT;
public static final String DFS_CONTAINER_RATIS_LOG_APPENDER_QUEUE_BYTE_LIMIT =
ScmConfigKeys.DFS_CONTAINER_RATIS_LOG_APPENDER_QUEUE_BYTE_LIMIT;
public static final String
DFS_CONTAINER_RATIS_LOG_APPENDER_QUEUE_BYTE_LIMIT_DEFAULT =
ScmConfigKeys.DFS_CONTAINER_RATIS_LOG_APPENDER_QUEUE_BYTE_LIMIT_DEFAULT;
public static final String DFS_RATIS_SERVER_REQUEST_TIMEOUT_DURATION_KEY =
ScmConfigKeys.DFS_RATIS_SERVER_REQUEST_TIMEOUT_DURATION_KEY;
public static final TimeDuration
Expand Down
Expand Up @@ -170,7 +170,7 @@ static RetryPolicy createRetryPolicy(Configuration conf) {
long retryInterval = conf.getTimeDuration(OzoneConfigKeys.
DFS_RATIS_CLIENT_REQUEST_RETRY_INTERVAL_KEY, OzoneConfigKeys.
DFS_RATIS_CLIENT_REQUEST_RETRY_INTERVAL_DEFAULT
.toInt(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS);
.toIntExact(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS);
TimeDuration sleepDuration =
TimeDuration.valueOf(retryInterval, TimeUnit.MILLISECONDS);
RetryPolicy retryPolicy = RetryPolicies
Expand Down
51 changes: 44 additions & 7 deletions hadoop-hdds/common/src/main/resources/ozone-default.xml
Expand Up @@ -76,10 +76,32 @@
</description>
</property>
<property>
<name>dfs.container.ratis.log.queue.size</name>
<value>128</value>
<name>dfs.container.ratis.log.queue.num-elements</name>
<value>1024</value>
<tag>OZONE, DEBUG, CONTAINER, RATIS</tag>
<description>Number of operation pending with Raft's Log Worker.
<description>Limit for the number of operations in Ratis Log Worker.
</description>
</property>
<property>
<name>dfs.container.ratis.log.queue.byte-limit</name>
<value>4GB</value>
<tag>OZONE, DEBUG, CONTAINER, RATIS</tag>
<description>Byte limit for Ratis Log Worker queue.
</description>
</property>
<property>
<name>dfs.container.ratis.log.appender.queue.num-elements</name>
<value>1</value>
<tag>OZONE, DEBUG, CONTAINER, RATIS</tag>
<description>Limit for number of append entries in ratis leader's
log appender queue.
</description>
</property>
<property>
<name>dfs.container.ratis.log.appender.queue.byte-limit</name>
<value>32MB</value>
<tag>OZONE, DEBUG, CONTAINER, RATIS</tag>
<description>Byte limit for ratis leader's log appender queue.
</description>
</property>
<property>
Expand Down Expand Up @@ -191,10 +213,10 @@
</property>
<property>
<name>dfs.container.ratis.segment.preallocated.size</name>
<value>128MB</value>
<value>16KB</value>
<tag>OZONE, RATIS, PERFORMANCE</tag>
<description>The size of the buffer which is preallocated for raft segment
used by Apache Ratis on datanodes.(128 MB by default)
used by Apache Ratis on datanodes.(16 KB by default)
</description>
</property>
<property>
Expand Down Expand Up @@ -1476,10 +1498,25 @@

<property>
<name>ozone.om.ratis.segment.preallocated.size</name>
<value>128MB</value>
<value>16KB</value>
<tag>OZONE, OM, RATIS, PERFORMANCE</tag>
<description>The size of the buffer which is preallocated for raft segment
used by Apache Ratis on OM.(128 MB by default)
used by Apache Ratis on OM.(16 KB by default)
</description>
</property>

<property>
<name>ozone.om.ratis.log.appender.queue.num-elements</name>
<value>1024</value>
<tag>OZONE, DEBUG, CONTAINER, RATIS</tag>
<description>Number of operation pending with Raft's Log Worker.
</description>
</property>
<property>
<name>ozone.om.ratis.log.appender.queue.byte-limit</name>
<value>32MB</value>
<tag>OZONE, DEBUG, CONTAINER, RATIS</tag>
<description>Byte limit for Raft's Log Worker queue.
</description>
</property>

Expand Down
Expand Up @@ -22,6 +22,7 @@
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import org.apache.hadoop.hdds.HddsUtils;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.ratis.proto.RaftProtos.RaftPeerRole;
import org.apache.ratis.protocol.RaftGroup;
import org.apache.ratis.protocol.RaftGroupId;
Expand Down Expand Up @@ -388,11 +389,12 @@ private ByteString readStateMachineData(
ContainerCommandRequestProto requestProto, long term, long index) {
WriteChunkRequestProto writeChunkRequestProto =
requestProto.getWriteChunk();
ContainerProtos.ChunkInfo chunkInfo = writeChunkRequestProto.getChunkData();
// prepare the chunk to be read
ReadChunkRequestProto.Builder readChunkRequestProto =
ReadChunkRequestProto.newBuilder()
.setBlockID(writeChunkRequestProto.getBlockID())
.setChunkData(writeChunkRequestProto.getChunkData());
.setChunkData(chunkInfo);
ContainerCommandRequestProto dataContainerCommandProto =
ContainerCommandRequestProto.newBuilder(requestProto)
.setCmdType(Type.ReadChunk)
Expand All @@ -411,7 +413,11 @@ private ByteString readStateMachineData(

ByteString data = responseProto.getData();
// assert that the response has data in it.
Preconditions.checkNotNull(data);
Preconditions
.checkNotNull(data, "read chunk data is null for chunk:" + chunkInfo);
Preconditions.checkState(data.size() == chunkInfo.getLen(), String.format(
"read chunk len=%d does not match chunk expected len=%d for chunk:%s",
data.size(), chunkInfo.getLen(), chunkInfo));
return data;
}

Expand Down
Expand Up @@ -175,8 +175,19 @@ private RaftProperties newRaftProperties(Configuration conf) {
OzoneConfigKeys.DFS_CONTAINER_RATIS_SEGMENT_PREALLOCATED_SIZE_KEY,
OzoneConfigKeys.DFS_CONTAINER_RATIS_SEGMENT_PREALLOCATED_SIZE_DEFAULT,
StorageUnit.BYTES);
RaftServerConfigKeys.Log.Appender.setBufferCapacity(properties,
SizeInBytes.valueOf(raftSegmentPreallocatedSize));
int logAppenderQueueNumElements = conf.getInt(
OzoneConfigKeys.DFS_CONTAINER_RATIS_LOG_APPENDER_QUEUE_NUM_ELEMENTS,
OzoneConfigKeys
.DFS_CONTAINER_RATIS_LOG_APPENDER_QUEUE_NUM_ELEMENTS_DEFAULT);
final int logAppenderQueueByteLimit = (int) conf.getStorageSize(
OzoneConfigKeys.DFS_CONTAINER_RATIS_LOG_APPENDER_QUEUE_BYTE_LIMIT,
OzoneConfigKeys
.DFS_CONTAINER_RATIS_LOG_APPENDER_QUEUE_BYTE_LIMIT_DEFAULT,
StorageUnit.BYTES);
RaftServerConfigKeys.Log.Appender
.setBufferElementLimit(properties, logAppenderQueueNumElements);
RaftServerConfigKeys.Log.Appender.setBufferByteLimit(properties,
SizeInBytes.valueOf(logAppenderQueueByteLimit));
RaftServerConfigKeys.Log.setPreallocatedSize(properties,
SizeInBytes.valueOf(raftSegmentPreallocatedSize));

Expand Down Expand Up @@ -255,8 +266,6 @@ private RaftProperties newRaftProperties(Configuration conf) {
leaderElectionMinTimeout.toLong(TimeUnit.MILLISECONDS) + 200;
RaftServerConfigKeys.Rpc.setTimeoutMax(properties,
TimeDuration.valueOf(leaderElectionMaxTimeout, TimeUnit.MILLISECONDS));
// Enable batch append on raft server
RaftServerConfigKeys.Log.Appender.setBatchEnabled(properties, true);

// Set the maximum cache segments
RaftServerConfigKeys.Log.setMaxCachedSegmentNum(properties, 2);
Expand Down Expand Up @@ -299,10 +308,15 @@ private RaftProperties newRaftProperties(Configuration conf) {
setAutoTriggerEnabled(properties, true);
RaftServerConfigKeys.Snapshot.
setAutoTriggerThreshold(properties, snapshotThreshold);
int logQueueSize =
conf.getInt(OzoneConfigKeys.DFS_CONTAINER_RATIS_LOG_QUEUE_SIZE,
OzoneConfigKeys.DFS_CONTAINER_RATIS_LOG_QUEUE_SIZE_DEFAULT);
RaftServerConfigKeys.Log.setQueueSize(properties, logQueueSize);
int logQueueNumElements =
conf.getInt(OzoneConfigKeys.DFS_CONTAINER_RATIS_LOG_QUEUE_NUM_ELEMENTS,
OzoneConfigKeys.DFS_CONTAINER_RATIS_LOG_QUEUE_NUM_ELEMENTS_DEFAULT);
final int logQueueByteLimit = (int) conf.getStorageSize(
OzoneConfigKeys.DFS_CONTAINER_RATIS_LOG_QUEUE_BYTE_LIMIT,
OzoneConfigKeys.DFS_CONTAINER_RATIS_LOG_QUEUE_BYTE_LIMIT_DEFAULT,
StorageUnit.BYTES);
RaftServerConfigKeys.Log.setElementLimit(properties, logQueueNumElements);
RaftServerConfigKeys.Log.setByteLimit(properties, logQueueByteLimit);

int numSyncRetries = conf.getInt(
OzoneConfigKeys.DFS_CONTAINER_RATIS_STATEMACHINEDATA_SYNC_RETRIES,
Expand Down Expand Up @@ -409,7 +423,7 @@ public void submitRequest(ContainerCommandRequestProto request,
RaftClientReply reply;
RaftClientRequest raftClientRequest =
createRaftClientRequest(request, pipelineID,
RaftClientRequest.writeRequestType(replicationLevel));
RaftClientRequest.writeRequestType());
try {
reply = server.submitClientRequestAsync(raftClientRequest).get();
} catch (Exception e) {
Expand Down
2 changes: 1 addition & 1 deletion hadoop-hdds/pom.xml
Expand Up @@ -46,7 +46,7 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd">
<hdds.version>0.4.0-SNAPSHOT</hdds.version>

<!-- Apache Ratis version -->
<ratis.version>0.4.0-b600fc2-SNAPSHOT</ratis.version>
<ratis.version>0.4.0-3b0be02-SNAPSHOT</ratis.version>

<bouncycastle.version>1.60</bouncycastle.version>

Expand Down
Expand Up @@ -28,7 +28,7 @@
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.retry.RetryPolicy;
import org.apache.ratis.rpc.SupportedRpcType;
import org.apache.ratis.util.CheckedBiConsumer;
import org.apache.ratis.util.function.CheckedBiConsumer;
import org.apache.ratis.util.TimeDuration;
import org.apache.ratis.util.TimeoutScheduler;
import org.slf4j.Logger;
Expand Down
Expand Up @@ -121,7 +121,19 @@ private OMConfigKeys() {
public static final String OZONE_OM_RATIS_SEGMENT_PREALLOCATED_SIZE_KEY
= "ozone.om.ratis.segment.preallocated.size";
public static final String OZONE_OM_RATIS_SEGMENT_PREALLOCATED_SIZE_DEFAULT
= "128MB";
= "16KB";

// OM Ratis Log Appender configurations
public static final String
OZONE_OM_RATIS_LOG_APPENDER_QUEUE_NUM_ELEMENTS =
"ozone.om.ratis.log.appender.queue.num-elements";
public static final int
OZONE_OM_RATIS_LOG_APPENDER_QUEUE_NUM_ELEMENTS_DEFAULT = 1024;
public static final String OZONE_OM_RATIS_LOG_APPENDER_QUEUE_BYTE_LIMIT =
"ozone.om.ratis.log.appender.queue.byte-limit";
public static final String
OZONE_OM_RATIS_LOG_APPENDER_QUEUE_BYTE_LIMIT_DEFAULT = "32MB";


// OM Ratis server configurations
public static final String OZONE_OM_RATIS_SERVER_REQUEST_TIMEOUT_KEY
Expand Down
Expand Up @@ -23,7 +23,7 @@
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
import org.apache.ratis.util.CheckedConsumer;
import org.apache.ratis.util.function.CheckedConsumer;
import org.junit.Assert;

import java.io.IOException;
Expand Down
Expand Up @@ -49,7 +49,7 @@

import static org.apache.ratis.rpc.SupportedRpcType.GRPC;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.util.CheckedBiConsumer;
import org.apache.ratis.util.function.CheckedBiConsumer;

import java.util.function.BiConsumer;

Expand Down
Expand Up @@ -28,7 +28,7 @@
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.ratis.rpc.RpcType;
import org.apache.ratis.rpc.SupportedRpcType;
import org.apache.ratis.util.CheckedBiConsumer;
import org.apache.ratis.util.function.CheckedBiConsumer;
import org.apache.ratis.util.CollectionUtils;
import org.junit.Ignore;
import org.junit.Rule;
Expand Down
Expand Up @@ -52,7 +52,7 @@
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.ratis.rpc.RpcType;
import org.apache.ratis.util.CheckedBiConsumer;
import org.apache.ratis.util.function.CheckedBiConsumer;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
Expand Down
Expand Up @@ -156,16 +156,25 @@ private RaftProperties newRaftProperties(Configuration conf) {
OMConfigKeys.OZONE_OM_RATIS_SEGMENT_PREALLOCATED_SIZE_KEY,
OMConfigKeys.OZONE_OM_RATIS_SEGMENT_PREALLOCATED_SIZE_DEFAULT,
StorageUnit.BYTES);
RaftServerConfigKeys.Log.Appender.setBufferCapacity(properties,
SizeInBytes.valueOf(raftSegmentPreallocatedSize));
int logAppenderQueueNumElements = conf.getInt(
OMConfigKeys.OZONE_OM_RATIS_LOG_APPENDER_QUEUE_NUM_ELEMENTS,
OMConfigKeys.OZONE_OM_RATIS_LOG_APPENDER_QUEUE_NUM_ELEMENTS_DEFAULT);
final int logAppenderQueueByteLimit = (int) conf.getStorageSize(
OMConfigKeys.OZONE_OM_RATIS_LOG_APPENDER_QUEUE_BYTE_LIMIT,
OMConfigKeys.OZONE_OM_RATIS_LOG_APPENDER_QUEUE_BYTE_LIMIT_DEFAULT,
StorageUnit.BYTES);
RaftServerConfigKeys.Log.Appender
.setBufferElementLimit(properties, logAppenderQueueNumElements);
RaftServerConfigKeys.Log.Appender.setBufferByteLimit(properties,
SizeInBytes.valueOf(logAppenderQueueByteLimit));
RaftServerConfigKeys.Log.setPreallocatedSize(properties,
SizeInBytes.valueOf(raftSegmentPreallocatedSize));

// For grpc set the maximum message size
// TODO: calculate the max message size based on the max size of a
// PutSmallFileRequest's file size limit
GrpcConfigKeys.setMessageSizeMax(properties,
SizeInBytes.valueOf(raftSegmentPreallocatedSize));
SizeInBytes.valueOf(logAppenderQueueByteLimit));

// Set the server request timeout
TimeUnit serverRequestTimeoutUnit =
Expand Down Expand Up @@ -209,9 +218,6 @@ private RaftProperties newRaftProperties(Configuration conf) {
RaftServerConfigKeys.Rpc.setTimeoutMax(properties,
serverMaxTimeout);

// Enable batch append on raft server
RaftServerConfigKeys.Log.Appender.setBatchEnabled(properties, true);

// Set the number of maximum cached segments
RaftServerConfigKeys.Log.setMaxCachedSegmentNum(properties, 2);

Expand Down
2 changes: 1 addition & 1 deletion hadoop-ozone/pom.xml
Expand Up @@ -33,7 +33,7 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd">
<hadoop.version>3.2.1-SNAPSHOT</hadoop.version>
<hdds.version>0.4.0-SNAPSHOT</hdds.version>
<ozone.version>0.4.0-SNAPSHOT</ozone.version>
<ratis.version>0.4.0-b600fc2-SNAPSHOT</ratis.version>
<ratis.version>0.4.0-3b0be02-SNAPSHOT</ratis.version>
<bouncycastle.version>1.60</bouncycastle.version>
<ozone.release>Badlands</ozone.release>
<declared.ozone.version>${ozone.version}</declared.ozone.version>
Expand Down

0 comments on commit b51e9e4

Please sign in to comment.