Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -445,7 +446,8 @@ protected synchronized void setOmProxies(Map<String,
}

protected synchronized void setOmNodeIDList(List<String> omNodeIDList) {
this.omNodeIDList = omNodeIDList;
Collections.shuffle(omNodeIDList);
this.omNodeIDList = Collections.unmodifiableList(omNodeIDList);
}

protected synchronized List<String> getOmNodeIDList() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.ozone.freon;

import com.codahale.metrics.Timer;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import org.apache.hadoop.hdds.cli.HddsVersionProvider;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.ozone.client.OzoneClient;
import org.kohsuke.MetaInfServices;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import picocli.CommandLine;
import picocli.CommandLine.Command;

/**
* Data generator tool test om performance.
*/
@Command(name = "fr",
aliases = "follower-reader",
description = "Read the same keySize from multiple threads.",
versionProvider = HddsVersionProvider.class,
mixinStandardHelpOptions = true,
showDefaultValues = true)
@MetaInfServices(FreonSubcommand.class)
public class FollowerReader extends BaseFreonGenerator
implements Callable<Void> {

private static final Logger LOG =
LoggerFactory.getLogger(FollowerReader.class);

@CommandLine.Option(names = {"-v", "--volume"},
description = "Name of the bucket which contains the test data. Will be"
+ " created if missing.",
defaultValue = "vol1")
private String volumeName;

@CommandLine.Option(names = {"-b", "--bucket"},
description = "Name of the bucket which contains the test data.",
defaultValue = "bucket1")
private String bucketName;

@CommandLine.Option(names = {"-k", "--key"},
description = "Name of the key which contains the test data.",
defaultValue = "key1")
private String keyName;

private String omServiceID = null;

private Timer timer;

private final List<OzoneClient> rpcClients = new ArrayList<>();

@Override
public Void call() throws Exception {
init();
OzoneConfiguration ozoneConfiguration = createOzoneConfiguration();

for (int i = 0; i < getThreadNo(); i++) {
OzoneClient rpcClient = createOzoneClient(omServiceID, ozoneConfiguration);
rpcClients.add(rpcClient);
}

timer = getMetrics().timer("follower-read");

runTests(this::readKeySize);
return null;
}

private void readKeySize(long counter) throws Exception {
int clientIdx = (int) (counter % rpcClients.size());
timer.time(() -> {
long unused = rpcClients.get(clientIdx).getObjectStore().getVolume(volumeName)
.getBucket(bucketName).getKey(keyName).getDataSize();
return null;
});
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ public void init(TestInfo testInfo) throws Exception {
conf.getObject(OzoneManagerRatisServerConfig.class);
omRatisConf.setLogAppenderWaitTimeMin(10);
conf.setFromObject(omRatisConf);
conf.set("ozone.om.client.rpc.timeout", "1m");

cluster = MiniOzoneCluster.newHABuilder(conf)
.setOMServiceId("om-service-test1")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ public void init(TestInfo testInfo) throws Exception {
OzoneManagerRatisServerConfig omRatisConf = conf.getObject(OzoneManagerRatisServerConfig.class);
omRatisConf.setLogAppenderWaitTimeMin(10);
conf.setFromObject(omRatisConf);
conf.set("ozone.om.client.rpc.timeout", "1m");
conf.setInt(OMConfigKeys.OZONE_OM_RATIS_LOG_PURGE_GAP, LOG_PURGE_GAP);
conf.setStorageSize(OMConfigKeys.OZONE_OM_RATIS_SEGMENT_SIZE_KEY, 16, StorageUnit.KB);
conf.setStorageSize(OMConfigKeys.OZONE_OM_RATIS_SEGMENT_PREALLOCATED_SIZE_KEY, 16, StorageUnit.KB);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,12 @@ public OMExecutionFlow(OzoneManager om) {
* @return OMResponse the response of execution
* @throws ServiceException the exception on execution
*/
public OMResponse submit(OMRequest omRequest) throws ServiceException {
public OMResponse submit(OMRequest omRequest, boolean isWrite) throws ServiceException {
// TODO: currently have only execution after ratis submission, but with new flow can have switch later
return submitExecutionToRatis(omRequest);
return submitExecutionToRatis(omRequest, isWrite);
}

private OMResponse submitExecutionToRatis(OMRequest request) throws ServiceException {
private OMResponse submitExecutionToRatis(OMRequest request, boolean isWrite) throws ServiceException {
// 1. create client request and preExecute
OMClientRequest omClientRequest = null;
final OMRequest requestToSubmit;
Expand All @@ -73,7 +73,7 @@ private OMResponse submitExecutionToRatis(OMRequest request) throws ServiceExcep
}

// 2. submit request to ratis
OMResponse response = ozoneManager.getOmRatisServer().submitRequest(requestToSubmit);
OMResponse response = ozoneManager.getOmRatisServer().submitRequest(requestToSubmit, isWrite);
if (!response.getSuccess()) {
omClientRequest.handleRequestFailure(ozoneManager);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@
import org.apache.ratis.rpc.SupportedRpcType;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.RaftServerConfigKeys.Read;
import org.apache.ratis.server.RetryCache;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.storage.RaftStorage;
Expand Down Expand Up @@ -124,6 +125,7 @@ public final class OzoneManagerRatisServer {

private final ClientId clientId = ClientId.randomId();
private static final AtomicLong CALL_ID_COUNTER = new AtomicLong();
private final Read.Option readOption;

private static long nextCallId() {
return CALL_ID_COUNTER.getAndIncrement() & Long.MAX_VALUE;
Expand Down Expand Up @@ -171,6 +173,8 @@ private OzoneManagerRatisServer(ConfigurationSource conf, OzoneManager om,
}
this.omStateMachine = getStateMachine(conf);

this.readOption = RaftServerConfigKeys.Read.option(serverProperties);

Parameters parameters = createServerTlsParameters(secConfig, certClient);
this.server = RaftServer.newBuilder()
.setServerId(this.raftPeerId)
Expand Down Expand Up @@ -239,11 +243,11 @@ public static OzoneManagerRatisServer newOMRatisServer(
* @return OMResponse - response returned to the client.
* @throws ServiceException
*/
public OMResponse submitRequest(OMRequest omRequest) throws ServiceException {
public OMResponse submitRequest(OMRequest omRequest, boolean isWrite) throws ServiceException {
// In prepare mode, only prepare and cancel requests are allowed to go
// through.
if (ozoneManager.getPrepareState().requestAllowed(omRequest.getCmdType())) {
RaftClientRequest raftClientRequest = createRaftRequest(omRequest);
RaftClientRequest raftClientRequest = createRaftRequest(omRequest, isWrite);
RaftClientReply raftClientReply = submitRequestToRatis(raftClientRequest);
return createOmResponse(omRequest, raftClientReply);
} else {
Expand Down Expand Up @@ -277,10 +281,10 @@ private RaftClientReply submitRequestToRatis(
() -> submitRequestToRatisImpl(raftClientRequest));
}

private RaftClientRequest createRaftRequest(OMRequest omRequest) {
private RaftClientRequest createRaftRequest(OMRequest omRequest, boolean isWrite) {
return captureLatencyNs(
perfMetrics.getCreateRatisRequestLatencyNs(),
() -> createRaftRequestImpl(omRequest));
() -> createRaftRequestImpl(omRequest, isWrite));
}

/**
Expand Down Expand Up @@ -500,7 +504,7 @@ public void removeRaftPeer(OMNodeDetails omNodeDetails) {
* @return RaftClientRequest - Raft Client request which is submitted to
* ratis server.
*/
private RaftClientRequest createRaftRequestImpl(OMRequest omRequest) {
private RaftClientRequest createRaftRequestImpl(OMRequest omRequest, boolean isWrite) {
return RaftClientRequest.newBuilder()
.setClientId(getClientId())
.setServerId(server.getId())
Expand All @@ -509,7 +513,7 @@ private RaftClientRequest createRaftRequestImpl(OMRequest omRequest) {
.setMessage(
Message.valueOf(
OMRatisHelper.convertRequestToByteString(omRequest)))
.setType(RaftClientRequest.writeRequestType())
.setType(isWrite ? RaftClientRequest.writeRequestType() : RaftClientRequest.readRequestType())
.build();
}

Expand Down Expand Up @@ -647,6 +651,10 @@ public RaftServer.Division getServerDivision() {
return serverDivision.get();
}

public boolean isLinearizableRead() {
return readOption == Read.Option.LINEARIZABLE;
}

/**
* Initializes and returns OzoneManager StateMachine.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,27 @@ public class OzoneManagerRatisServerConfig {
)
private long retryCacheTimeout = Duration.ofSeconds(300).toMillis();

@Config(key = "read.option",
defaultValue = "DEFAULT",
type = ConfigType.STRING,
tags = {OZONE, OM, RATIS, PERFORMANCE},
description = "Select the Ratis server read option." +
" Possible values are: " +
" DEFAULT - Directly query statemachine (non-linearizable). " +
" Only the leader can serve read requests. " +
Comment on lines +61 to +62
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
" DEFAULT - Directly query statemachine (non-linearizable). " +
" Only the leader can serve read requests. " +
" DEFAULT - Directly query statemachine. " +
" Only the leader can serve read requests. " +

IIUC, the current DEFAULT is still linearizable. To avoid confusion, let’s rename the modes to something more explicit—for example:

Then we can add our own parsing that maps these OM-level configs to the corresponding Ratis read options, instead of relying directly on RaftServerConfigKeys.Read.option(...).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently this configuration is using directly from Ratis. If changed, OM Ratis Server won't start normally.

The thing is the localLease is not a 1:1 mapping, if locallease condition check doesn't fit, it will fallback to ReadIndex which is linearizable.

" LINEARIZABLE - Use ReadIndex (see Raft Paper section 6.4) to maintain linearizability. " +
" Both the leader and the followers can serve read requests."
)
private String readOption;

@Config(key = "read.leader.lease.enabled",
defaultValue = "false",
type = ConfigType.BOOLEAN,
tags = {OZONE, OM, RATIS, PERFORMANCE},
description = "If we enabled the leader lease on Ratis Leader."
)
private boolean readLeaderLeaseEnabled;

public long getLogAppenderWaitTimeMin() {
return logAppenderWaitTimeMin;
}
Expand All @@ -67,4 +88,20 @@ public long getRetryCacheTimeout() {
public void setRetryCacheTimeout(Duration duration) {
this.retryCacheTimeout = duration.toMillis();
}

public String getReadOption() {
return readOption;
}

public void setReadOption(String option) {
this.readOption = option;
}

public boolean isReadLeaderLeaseEnabled() {
return readLeaderLeaseEnabled;
}

public void setReadLeaderLeaseEnabled(boolean readLeaderLeaseEnabled) {
this.readLeaderLeaseEnabled = readLeaderLeaseEnabled;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ private OMResponse internalProcessRequest(OMRequest request) throws ServiceExcep
}

this.lastRequestToSubmit = request;
return ozoneManager.getOmExecutionFlow().submit(request);
return ozoneManager.getOmExecutionFlow().submit(request, true);
} finally {
OzoneManager.setS3Auth(null);
}
Expand All @@ -184,6 +184,10 @@ public OMRequest getLastRequestToSubmit() {

private OMResponse submitReadRequestToOM(OMRequest request)
throws ServiceException {
// Read from leader or followers using linearizable read
if (omRatisServer.isLinearizableRead()) {
return ozoneManager.getOmExecutionFlow().submit(request, false);
}
// Check if this OM is the leader.
RaftServerStatus raftServerStatus = omRatisServer.getLeaderStatus();
if (raftServerStatus == LEADER_AND_READY ||
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public void testAccessContorlExceptionFailovers() throws Exception {
// Request should try all OMs one be one and fail when the last OM also
// throws AccessControlException.
assertThat(serviceException).hasCauseInstanceOf(AccessControlException.class)
.hasMessage("ServiceException of type class org.apache.hadoop.security.AccessControlException for om3");
.hasMessageStartingWith("ServiceException of type class org.apache.hadoop.security.AccessControlException");
assertThat(logCapturer.getOutput()).contains(getRetryProxyDebugMsg("om1"));
assertThat(logCapturer.getOutput()).contains(getRetryProxyDebugMsg("om2"));
assertThat(logCapturer.getOutput()).contains(getRetryProxyDebugMsg("om3"));
Expand Down