Skip to content

Commit

Permalink
HDDS-10593. Prefer client read from IN_SERVICE datanodes (#6449)
Browse files Browse the repository at this point in the history
  • Loading branch information
xichen01 committed Apr 9, 2024
1 parent 0f43dbc commit 9b248a0
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@

import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -42,6 +44,7 @@
import org.apache.hadoop.hdds.protocol.datanode.proto.XceiverClientProtocolServiceGrpc;
import org.apache.hadoop.hdds.protocol.datanode.proto.XceiverClientProtocolServiceGrpc.XceiverClientProtocolServiceStub;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState;
import org.apache.hadoop.hdds.scm.client.ClientTrustManager;
import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
Expand Down Expand Up @@ -384,6 +387,12 @@ private XceiverClientReply sendCommandWithRetry(
}
}

boolean allInService = datanodeList.stream()
.allMatch(dn -> dn.getPersistedOpState() == NodeOperationalState.IN_SERVICE);
if (!allInService) {
datanodeList = sortDatanodeByOperationalState(datanodeList);
}

for (DatanodeDetails dn : datanodeList) {
try {
if (LOG.isDebugEnabled()) {
Expand Down Expand Up @@ -447,6 +456,30 @@ private XceiverClientReply sendCommandWithRetry(
}
}

private static List<DatanodeDetails> sortDatanodeByOperationalState(
List<DatanodeDetails> datanodeList) {
List<DatanodeDetails> sortedDatanodeList = new ArrayList<>(datanodeList);
// Make IN_SERVICE's Datanode precede all other State's Datanodes.
// This is a stable sort that does not change the order of the
// IN_SERVICE's Datanode.
Comparator<DatanodeDetails> byOpStateStable = (first, second) -> {
boolean firstInService = first.getPersistedOpState() ==
NodeOperationalState.IN_SERVICE;
boolean secondInService = second.getPersistedOpState() ==
NodeOperationalState.IN_SERVICE;

if (firstInService == secondInService) {
return 0;
} else if (firstInService) {
return -1;
} else {
return 1;
}
};
sortedDatanodeList.sort(byOpStateStable);
return sortedDatanodeList;
}

@Override
public XceiverClientReply sendCommandAsync(
ContainerCommandRequestProto request)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,17 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;

import org.apache.commons.lang3.RandomUtils;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.client.RatisReplicationConfig;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
import org.apache.hadoop.hdds.scm.pipeline.MockPipeline;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
import org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls;
Expand Down Expand Up @@ -174,6 +178,39 @@ public XceiverClientReply sendCommandAsync(
assertEquals(1, seenDNs.size());
}

@Test
public void testPrimaryReadFromNormalDatanode()
throws IOException {
final List<DatanodeDetails> seenDNs = new ArrayList<>();
for (int i = 0; i < 100; i++) {
Pipeline randomPipeline = MockPipeline.createRatisPipeline();
int nodeCount = randomPipeline.getNodes().size();
assertThat(nodeCount).isGreaterThan(1);
randomPipeline.getNodes().forEach(
node -> assertEquals(NodeOperationalState.IN_SERVICE, node.getPersistedOpState()));

randomPipeline.getNodes().get(
RandomUtils.nextInt(0, nodeCount)).setPersistedOpState(NodeOperationalState.IN_MAINTENANCE);
randomPipeline.getNodes().get(
RandomUtils.nextInt(0, nodeCount)).setPersistedOpState(NodeOperationalState.IN_MAINTENANCE);
try (XceiverClientGrpc client = new XceiverClientGrpc(randomPipeline, conf) {
@Override
public XceiverClientReply sendCommandAsync(
ContainerProtos.ContainerCommandRequestProto request,
DatanodeDetails dn) {
seenDNs.add(dn);
return buildValidResponse();
}
}) {
invokeXceiverClientGetBlock(client);
} catch (IOException e) {
e.printStackTrace();
}
// Always the IN_SERVICE datanode will be read first
assertEquals(NodeOperationalState.IN_SERVICE, seenDNs.get(0).getPersistedOpState());
}
}

@Test
public void testConnectionReusedAfterGetBlock() throws IOException {
// With a new Client, make 100 calls. On each call, ensure that only one
Expand Down

0 comments on commit 9b248a0

Please sign in to comment.