Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions hadoop-ozone/dist/src/main/smoketest/freon/generate.robot
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,12 @@ Test Timeout 5 minutes
*** Variables ***
${PREFIX} ${EMPTY}

*** Keywords ***
DN Chunk Generator
Return From Keyword If '${SECURITY_ENABLED}' == 'true'
${result} = Execute ozone freon dcg -t1 -n100 -p dcg${PREFIX}
Should contain ${result} Successful executions: 100

*** Test Cases ***
Ozone Client Key Generator
${result} = Execute ozone freon ockg ${OM_HA_PARAM} -t=1 -n=1 -p ockg${PREFIX}
Expand All @@ -33,3 +39,6 @@ OM Key Generator
OM Bucket Generator
${result} = Execute ozone freon ombg ${OM_HA_PARAM} -t=1 -n=1 -p ombg${PREFIX}
Should contain ${result} Successful executions: 1

DN Chunk Generator
DN Chunk Generator
9 changes: 9 additions & 0 deletions hadoop-ozone/dist/src/main/smoketest/freon/validate.robot
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,16 @@ Test Timeout 5 minutes
*** Variables ***
${PREFIX} ${EMPTY}

*** Keywords ***
DN Chunk Validator
Return From Keyword If '${SECURITY_ENABLED}' == 'true'
${result} = Execute ozone freon dcv -t1 -n100 -p dcg${PREFIX}
Should contain ${result} Successful executions: 100

*** Test Cases ***
Ozone Client Key Validator
${result} = Execute ozone freon ockv ${OM_HA_PARAM} -t=1 -n=1 -p ockg${PREFIX}
Should contain ${result} Successful executions: 1

DN Chunk Validator
DN Chunk Validator
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,6 @@ private void writeChunk(long stepNo)
DatanodeBlockID blockId = DatanodeBlockID.newBuilder()
.setContainerID(1L)
.setLocalID(stepNo % 20)
.setBlockCommitSequenceId(stepNo)
.build();

ChunkInfo chunkInfo = ChunkInfo.newBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.XceiverClientManager;
import org.apache.hadoop.hdds.scm.XceiverClientSpi;
Expand All @@ -34,6 +36,7 @@
import org.apache.hadoop.ozone.common.ChecksumData;

import com.codahale.metrics.Timer;
import org.apache.hadoop.ozone.common.OzoneChecksumException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import picocli.CommandLine.Command;
Expand Down Expand Up @@ -65,13 +68,14 @@ public class DatanodeChunkValidator extends BaseFreonGenerator
defaultValue = "1024")
private int chunkSize;

private XceiverClientSpi xceiverClientSpi;
private XceiverClientSpi xceiverClient;

private Timer timer;

private ChecksumData checksumReference;

private Checksum checksum;
private ContainerProtos.ChecksumData checksumProtobuf;


@Override
Expand Down Expand Up @@ -112,18 +116,25 @@ public Void call() throws Exception {

try (XceiverClientManager xceiverClientManager =
new XceiverClientManager(ozoneConf)) {
xceiverClientSpi = xceiverClientManager.acquireClient(pipeline);
xceiverClient = xceiverClientManager.acquireClientForReadData(pipeline);

checksumProtobuf = ContainerProtos.ChecksumData.newBuilder()
.setBytesPerChecksum(4)
.setType(ContainerProtos.ChecksumType.CRC32)
.build();

readReference();

timer = getMetrics().timer("chunk-validate");

runTests(this::validateChunk);

xceiverClientManager.releaseClientForReadData(xceiverClient, true);
}

} finally {
if (xceiverClientSpi != null) {
xceiverClientSpi.close();
if (xceiverClient != null) {
xceiverClient.close();
}
}
return null;
Expand All @@ -132,75 +143,52 @@ public Void call() throws Exception {
/**
* Read a reference chunk using same name than one from the
* {@link org.apache.hadoop.ozone.freon.DatanodeChunkGenerator}.
* @throws IOException
*/
private void readReference() throws IOException {
ContainerProtos.DatanodeBlockID blockId =
ContainerProtos.DatanodeBlockID.newBuilder()
.setContainerID(1L)
.setLocalID(0 % 20)
.setBlockCommitSequenceId(0)
.build();
ContainerCommandRequestProto request = createReadChunkRequest(0);
ContainerCommandResponseProto response =
xceiverClient.sendCommand(request);

// As a reference, the first one generated (at step 0) is taken
ContainerProtos.ChunkInfo chunkInfo = ContainerProtos.ChunkInfo.newBuilder()
.setChunkName(getPrefix() + "_testdata_chunk_" + 0)
.setOffset((0 / 20) * chunkSize)
.setLen(chunkSize)
.setChecksumData(
ContainerProtos.ChecksumData.newBuilder()
.setBytesPerChecksum(4)
.setType(ContainerProtos.ChecksumType.CRC32)
.build())
.build();
checksum = new Checksum(ContainerProtos.ChecksumType.CRC32, chunkSize);
checksumReference = computeChecksum(response);
}

ContainerProtos.ReadChunkRequestProto.Builder readChunkRequest =
ContainerProtos.ReadChunkRequestProto
.newBuilder()
.setBlockID(blockId)
.setChunkData(chunkInfo);

String id = xceiverClientSpi.getPipeline().getFirstNode().getUuidString();
private void validateChunk(long stepNo) throws Exception {
ContainerCommandRequestProto request = createReadChunkRequest(stepNo);

ContainerProtos.ContainerCommandRequestProto.Builder builder =
ContainerProtos.ContainerCommandRequestProto
.newBuilder()
.setCmdType(ContainerProtos.Type.ReadChunk)
.setContainerID(blockId.getContainerID())
.setDatanodeUuid(id)
.setReadChunk(readChunkRequest);
timer.time(() -> {
try {
ContainerCommandResponseProto response =
xceiverClient.sendCommand(request);

ContainerProtos.ContainerCommandRequestProto request = builder.build();
ContainerProtos.ContainerCommandResponseProto response =
xceiverClientSpi.sendCommand(request);
ChecksumData checksumOfChunk = computeChecksum(response);

checksum = new Checksum(ContainerProtos.ChecksumType.CRC32, chunkSize);
if (response.getReadChunk().hasData()) {
checksumReference = checksum.computeChecksum(
response.getReadChunk().getData().toByteArray());
} else {
checksumReference = checksum.computeChecksum(
response.getReadChunk().getDataBuffers().getBuffersList());
}
if (!checksumReference.equals(checksumOfChunk)) {
throw new IllegalStateException(
"Reference (=first) message checksum doesn't match " +
"with checksum of chunk "
+ response.getReadChunk()
.getChunkData().getChunkName());
}
} catch (IOException e) {
LOG.warn("Could not read chunk due to IOException: ", e);
}
});

}


private void validateChunk(long stepNo) throws Exception {
private ContainerCommandRequestProto createReadChunkRequest(long stepNo)
throws IOException {
ContainerProtos.DatanodeBlockID blockId =
ContainerProtos.DatanodeBlockID.newBuilder()
.setContainerID(1L)
.setLocalID(stepNo % 20)
.setBlockCommitSequenceId(stepNo)
.build();

ContainerProtos.ChunkInfo chunkInfo = ContainerProtos.ChunkInfo.newBuilder()
.setChunkName(getPrefix() + "_testdata_chunk_" + stepNo)
.setChecksumData(
ContainerProtos.ChecksumData.newBuilder()
.setBytesPerChecksum(4)
.setType(ContainerProtos.ChecksumType.CRC32)
.build())
.setChecksumData(checksumProtobuf)
.setOffset((stepNo / 20) * chunkSize)
.setLen(chunkSize)
.build();
Expand All @@ -211,44 +199,28 @@ private void validateChunk(long stepNo) throws Exception {
.setBlockID(blockId)
.setChunkData(chunkInfo);

String id = xceiverClientSpi.getPipeline().getFirstNode().getUuidString();
String id = xceiverClient.getPipeline().getFirstNode().getUuidString();

ContainerProtos.ContainerCommandRequestProto.Builder builder =
ContainerProtos.ContainerCommandRequestProto
ContainerCommandRequestProto.Builder builder =
ContainerCommandRequestProto
.newBuilder()
.setCmdType(ContainerProtos.Type.ReadChunk)
.setContainerID(blockId.getContainerID())
.setDatanodeUuid(id)
.setReadChunk(readChunkRequest);

ContainerProtos.ContainerCommandRequestProto request = builder.build();

timer.time(() -> {
try {
ContainerProtos.ContainerCommandResponseProto response =
xceiverClientSpi.sendCommand(request);

ChecksumData checksumOfChunk;
if (response.getReadChunk().hasData()) {
checksumOfChunk = checksum.computeChecksum(
response.getReadChunk().getData().toByteArray());
} else {
checksumOfChunk = checksum.computeChecksum(
response.getReadChunk().getDataBuffers().getBuffersList());
}

if (!checksumReference.equals(checksumOfChunk)) {
throw new IllegalStateException(
"Reference (=first) message checksum doesn't match " +
"with checksum of chunk "
+ response.getReadChunk()
.getChunkData().getChunkName());
}
} catch (IOException e) {
LOG.warn("Could not read chunk due to IOException: ", e);
}
});
return builder.build();
}

private ChecksumData computeChecksum(ContainerCommandResponseProto response)
throws OzoneChecksumException {
ContainerProtos.ReadChunkResponseProto readChunk = response.getReadChunk();
if (readChunk.hasData()) {
return checksum.computeChecksum(readChunk.getData().toByteArray());
} else {
return checksum.computeChecksum(
readChunk.getDataBuffers().getBuffersList());
}
}


Expand Down