Skip to content

Commit

Permalink
HDDS-799. Avoid ByteString to byte array conversion cost by using Byt…
Browse files Browse the repository at this point in the history
…eBuffer in Datanode. Contributed by Mukul Kumar Singh.
  • Loading branch information
bshashikant committed Nov 5, 2018
1 parent c8ca174 commit 942693b
Show file tree
Hide file tree
Showing 6 changed files with 62 additions and 47 deletions.
Expand Up @@ -20,6 +20,7 @@

import java.io.FileInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
Expand Down Expand Up @@ -76,7 +77,7 @@

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.protobuf.ByteString;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import static org.apache.hadoop.hdds.HddsConfigKeys
.HDDS_DATANODE_VOLUME_CHOOSING_POLICY;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.*;
Expand Down Expand Up @@ -652,10 +653,10 @@ ContainerCommandResponseProto handleWriteChunk(
ChunkInfo chunkInfo = ChunkInfo.getFromProtoBuf(chunkInfoProto);
Preconditions.checkNotNull(chunkInfo);

byte[] data = null;
ByteBuffer data = null;
if (request.getWriteChunk().getStage() == Stage.WRITE_DATA ||
request.getWriteChunk().getStage() == Stage.COMBINED) {
data = request.getWriteChunk().getData().toByteArray();
data = request.getWriteChunk().getData().asReadOnlyByteBuffer();
}

chunkManager.writeChunk(kvContainer, blockID, chunkInfo, data,
Expand Down Expand Up @@ -713,7 +714,7 @@ ContainerCommandResponseProto handlePutSmallFile(
ChunkInfo chunkInfo = ChunkInfo.getFromProtoBuf(
putSmallFileReq.getChunkInfo());
Preconditions.checkNotNull(chunkInfo);
byte[] data = putSmallFileReq.getData().toByteArray();
ByteBuffer data = putSmallFileReq.getData().asReadOnlyByteBuffer();
// chunks will be committed as a part of handling putSmallFile
// here. There is no need to maintain this info in openContainerBlockMap.
chunkManager.writeChunk(
Expand All @@ -724,7 +725,7 @@ ContainerCommandResponseProto handlePutSmallFile(
blockData.setChunks(chunks);
// TODO: add bcsId as a part of putSmallFile transaction
blockManager.putBlock(kvContainer, blockData);
metrics.incContainerBytesStats(Type.PutSmallFile, data.length);
metrics.incContainerBytesStats(Type.PutSmallFile, data.capacity());

} catch (StorageContainerException ex) {
return ContainerUtils.logAndReturnError(LOG, ex, request);
Expand Down
Expand Up @@ -73,15 +73,15 @@ private ChunkUtils() {
* @throws StorageContainerException
*/
public static void writeData(File chunkFile, ChunkInfo chunkInfo,
byte[] data, VolumeIOStats volumeIOStats) throws
StorageContainerException, ExecutionException, InterruptedException,
NoSuchAlgorithmException {

ByteBuffer data, VolumeIOStats volumeIOStats)
throws StorageContainerException, ExecutionException,
InterruptedException, NoSuchAlgorithmException {
int bufferSize = data.capacity();
Logger log = LoggerFactory.getLogger(ChunkManagerImpl.class);
if (data.length != chunkInfo.getLen()) {
if (bufferSize != chunkInfo.getLen()) {
String err = String.format("data array does not match the length " +
"specified. DataLen: %d Byte Array: %d",
chunkInfo.getLen(), data.length);
chunkInfo.getLen(), bufferSize);
log.error(err);
throw new StorageContainerException(err, INVALID_WRITE_SIZE);
}
Expand All @@ -103,16 +103,16 @@ public static void writeData(File chunkFile, ChunkInfo chunkInfo,
StandardOpenOption.SPARSE,
StandardOpenOption.SYNC);
lock = file.lock().get();
int size = file.write(ByteBuffer.wrap(data), chunkInfo.getOffset()).get();
int size = file.write(data, chunkInfo.getOffset()).get();
// Increment volumeIO stats here.
volumeIOStats.incWriteTime(Time.monotonicNow() - writeTimeStart);
volumeIOStats.incWriteOpCount();
volumeIOStats.incWriteBytes(size);
if (size != data.length) {
if (size != bufferSize) {
log.error("Invalid write size found. Size:{} Expected: {} ", size,
data.length);
bufferSize);
throw new StorageContainerException("Invalid write size found. " +
"Size: " + size + " Expected: " + data.length, INVALID_WRITE_SIZE);
"Size: " + size + " Expected: " + bufferSize, INVALID_WRITE_SIZE);
}
} catch (StorageContainerException ex) {
throw ex;
Expand Down Expand Up @@ -183,7 +183,8 @@ public static ByteBuffer readData(File chunkFile, ChunkInfo data,
volumeIOStats.incReadOpCount();
volumeIOStats.incReadBytes(data.getLen());
if (data.getChecksum() != null && !data.getChecksum().isEmpty()) {
verifyChecksum(data, buf.array(), log);
buf.rewind();
verifyChecksum(data, buf, log);
}
return buf;
} catch (IOException e) {
Expand Down Expand Up @@ -211,10 +212,11 @@ public static ByteBuffer readData(File chunkFile, ChunkInfo data,
* @throws NoSuchAlgorithmException
* @throws StorageContainerException
*/
private static void verifyChecksum(ChunkInfo chunkInfo, byte[] data, Logger
log) throws NoSuchAlgorithmException, StorageContainerException {
private static void verifyChecksum(ChunkInfo chunkInfo, ByteBuffer data,
Logger log) throws NoSuchAlgorithmException, StorageContainerException {
MessageDigest sha = MessageDigest.getInstance(OzoneConsts.FILE_HASH);
sha.update(data);
data.rewind();
if (!Hex.encodeHexString(sha.digest()).equals(
chunkInfo.getChecksum())) {
log.error("Checksum mismatch. Provided: {} , computed: {}",
Expand Down
Expand Up @@ -66,7 +66,7 @@ public class ChunkManagerImpl implements ChunkManager {
* @throws StorageContainerException
*/
public void writeChunk(Container container, BlockID blockID, ChunkInfo info,
byte[] data, ContainerProtos.Stage stage)
ByteBuffer data, ContainerProtos.Stage stage)
throws StorageContainerException {

try {
Expand Down
Expand Up @@ -23,6 +23,7 @@
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
import org.apache.hadoop.ozone.container.common.interfaces.Container;
import java.nio.ByteBuffer;

/**
* Chunk Manager allows read, write, delete and listing of chunks in
Expand All @@ -41,7 +42,7 @@ public interface ChunkManager {
* @throws StorageContainerException
*/
void writeChunk(Container container, BlockID blockID, ChunkInfo info,
byte[] data, ContainerProtos.Stage stage)
ByteBuffer data, ContainerProtos.Stage stage)
throws StorageContainerException;

/**
Expand Down
Expand Up @@ -39,6 +39,7 @@
import org.mockito.Mockito;

import java.io.File;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.UUID;

Expand Down Expand Up @@ -109,8 +110,8 @@ public void testWriteChunkStageWriteAndCommit() throws Exception {

// As no chunks are written to the volume writeBytes should be 0
checkWriteIOStats(0, 0);
chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo, data,
ContainerProtos.Stage.WRITE_DATA);
chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo,
ByteBuffer.wrap(data), ContainerProtos.Stage.WRITE_DATA);
// Now a chunk file is being written with Stage WRITE_DATA, so it should
// create a temporary chunk file.
assertTrue(chunksPath.listFiles().length == 1);
Expand All @@ -126,8 +127,8 @@ public void testWriteChunkStageWriteAndCommit() throws Exception {

checkWriteIOStats(data.length, 1);

chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo, data,
ContainerProtos.Stage.COMMIT_DATA);
chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo,
ByteBuffer.wrap(data), ContainerProtos.Stage.COMMIT_DATA);

checkWriteIOStats(data.length, 1);

Expand All @@ -146,8 +147,8 @@ public void testWriteChunkIncorrectLength() throws Exception {
long randomLength = 200L;
chunkInfo = new ChunkInfo(String.format("%d.data.%d", blockID
.getLocalID(), 0), 0, randomLength);
chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo, data,
ContainerProtos.Stage.WRITE_DATA);
chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo,
ByteBuffer.wrap(data), ContainerProtos.Stage.WRITE_DATA);
fail("testWriteChunkIncorrectLength failed");
} catch (StorageContainerException ex) {
// As we got an exception, writeBytes should be 0.
Expand All @@ -167,8 +168,8 @@ public void testWriteChunkStageCombinedData() throws Exception {
// Initially chunks folder should be empty.
assertTrue(chunksPath.listFiles().length == 0);
checkWriteIOStats(0, 0);
chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo, data,
ContainerProtos.Stage.COMBINED);
chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo,
ByteBuffer.wrap(data), ContainerProtos.Stage.COMBINED);
// Now a chunk file is being written with Stage COMBINED_DATA, so it should
// create a chunk file.
assertTrue(chunksPath.listFiles().length == 1);
Expand All @@ -180,8 +181,8 @@ public void testWriteChunkStageCombinedData() throws Exception {
@Test
public void testReadChunk() throws Exception {
checkWriteIOStats(0, 0);
chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo, data,
ContainerProtos.Stage.COMBINED);
chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo,
ByteBuffer.wrap(data), ContainerProtos.Stage.COMBINED);
checkWriteIOStats(data.length, 1);
checkReadIOStats(0, 0);
byte[] expectedData = chunkManager.readChunk(keyValueContainer, blockID,
Expand All @@ -194,8 +195,8 @@ public void testReadChunk() throws Exception {
@Test
public void testDeleteChunk() throws Exception {
File chunksPath = new File(keyValueContainerData.getChunksPath());
chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo, data,
ContainerProtos.Stage.COMBINED);
chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo,
ByteBuffer.wrap(data), ContainerProtos.Stage.COMBINED);
assertTrue(chunksPath.listFiles().length == 1);
chunkManager.deleteChunk(keyValueContainer, blockID, chunkInfo);
assertTrue(chunksPath.listFiles().length == 0);
Expand All @@ -204,8 +205,8 @@ public void testDeleteChunk() throws Exception {
@Test
public void testDeleteChunkUnsupportedRequest() throws Exception {
try {
chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo, data,
ContainerProtos.Stage.COMBINED);
chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo,
ByteBuffer.wrap(data), ContainerProtos.Stage.COMBINED);
long randomLength = 200L;
chunkInfo = new ChunkInfo(String.format("%d.data.%d", blockID
.getLocalID(), 0), 0, randomLength);
Expand All @@ -224,8 +225,8 @@ public void testWriteChunkChecksumMismatch() throws Exception {
.getLocalID(), 0), 0, data.length);
//Setting checksum to some value.
chunkInfo.setChecksum("some garbage");
chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo, data,
ContainerProtos.Stage.COMBINED);
chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo,
ByteBuffer.wrap(data), ContainerProtos.Stage.COMBINED);
fail("testWriteChunkChecksumMismatch failed");
} catch (StorageContainerException ex) {
GenericTestUtils.assertExceptionContains("Checksum mismatch.", ex);
Expand All @@ -252,8 +253,8 @@ public void testWriteAndReadChunkMultipleTimes() throws Exception {
for (int i=0; i<100; i++) {
chunkInfo = new ChunkInfo(String.format("%d.data.%d", blockID
.getLocalID(), i), 0, data.length);
chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo, data,
ContainerProtos.Stage.COMBINED);
chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo,
ByteBuffer.wrap(data), ContainerProtos.Stage.COMBINED);
}
checkWriteIOStats(data.length*100, 100);
assertTrue(hddsVolume.getVolumeIOStats().getWriteTime() > 0);
Expand Down
Expand Up @@ -59,6 +59,7 @@

import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.Path;
Expand Down Expand Up @@ -330,7 +331,8 @@ private ChunkInfo writeChunkHelper(BlockID blockID)
blockID.getLocalID(), 0, 0, datalen);
byte[] data = getData(datalen);
setDataChecksum(info, data);
chunkManager.writeChunk(container, blockID, info, data, COMBINED);
chunkManager.writeChunk(container, blockID, info, ByteBuffer.wrap(data),
COMBINED);
return info;

}
Expand Down Expand Up @@ -371,7 +373,8 @@ public void testWritReadManyChunks() throws IOException,
ChunkInfo info = getChunk(blockID.getLocalID(), x, 0, datalen);
byte[] data = getData(datalen);
setDataChecksum(info, data);
chunkManager.writeChunk(container, blockID, info, data, COMBINED);
chunkManager.writeChunk(container, blockID, info, ByteBuffer.wrap(data),
COMBINED);
String fileName = String.format("%s.data.%d", blockID.getLocalID(), x);
fileHashMap.put(fileName, info);
}
Expand Down Expand Up @@ -431,7 +434,8 @@ public void testPartialRead() throws Exception {
blockID.getLocalID(), 0, 0, datalen);
byte[] data = getData(datalen);
setDataChecksum(info, data);
chunkManager.writeChunk(container, blockID, info, data, COMBINED);
chunkManager.writeChunk(container, blockID, info, ByteBuffer.wrap(data),
COMBINED);

byte[] readData = chunkManager.readChunk(container, blockID, info);
assertTrue(Arrays.equals(data, readData));
Expand Down Expand Up @@ -463,11 +467,14 @@ public void testOverWrite() throws IOException,
blockID.getLocalID(), 0, 0, datalen);
byte[] data = getData(datalen);
setDataChecksum(info, data);
chunkManager.writeChunk(container, blockID, info, data, COMBINED);
chunkManager.writeChunk(container, blockID, info, data, COMBINED);
chunkManager.writeChunk(container, blockID, info, ByteBuffer.wrap(data),
COMBINED);
chunkManager.writeChunk(container, blockID, info, ByteBuffer.wrap(data),
COMBINED);
// With the overwrite flag it should work now.
info.addMetadata(OzoneConsts.CHUNK_OVERWRITE, "true");
chunkManager.writeChunk(container, blockID, info, data, COMBINED);
chunkManager.writeChunk(container, blockID, info, ByteBuffer.wrap(data),
COMBINED);
long bytesUsed = container.getContainerData().getBytesUsed();
Assert.assertEquals(datalen, bytesUsed);

Expand Down Expand Up @@ -501,7 +508,8 @@ public void testMultipleWriteSingleRead() throws IOException,
byte[] data = getData(datalen);
oldSha.update(data);
setDataChecksum(info, data);
chunkManager.writeChunk(container, blockID, info, data, COMBINED);
chunkManager.writeChunk(container, blockID, info, ByteBuffer.wrap(data),
COMBINED);
}

// Request to read the whole data in a single go.
Expand Down Expand Up @@ -532,7 +540,8 @@ public void testDeleteChunk() throws IOException,
blockID.getLocalID(), 0, 0, datalen);
byte[] data = getData(datalen);
setDataChecksum(info, data);
chunkManager.writeChunk(container, blockID, info, data, COMBINED);
chunkManager.writeChunk(container, blockID, info, ByteBuffer.wrap(data),
COMBINED);
chunkManager.deleteChunk(container, blockID, info);
exception.expect(StorageContainerException.class);
exception.expectMessage("Unable to find the chunk file.");
Expand Down Expand Up @@ -646,7 +655,8 @@ public void testPutBlockWithLotsOfChunks() throws IOException,
info = getChunk(blockID.getLocalID(), x, x * datalen, datalen);
byte[] data = getData(datalen);
setDataChecksum(info, data);
chunkManager.writeChunk(container, blockID, info, data, COMBINED);
chunkManager.writeChunk(container, blockID, info, ByteBuffer.wrap(data),
COMBINED);
totalSize += datalen;
chunkList.add(info);
}
Expand Down

0 comments on commit 942693b

Please sign in to comment.