Skip to content

Commit

Permalink
HDDS-339. Add block length and blockId in PutKeyResponse. Contributed…
Browse files Browse the repository at this point in the history
… by Shashikant Banerjee.
  • Loading branch information
mukul1987 committed Aug 10, 2018
1 parent 15241c6 commit 398d895
Show file tree
Hide file tree
Showing 6 changed files with 98 additions and 18 deletions.
Expand Up @@ -308,6 +308,7 @@ message PutKeyRequestProto {
}

message PutKeyResponseProto {
required GetCommittedBlockLengthResponseProto committedBlockLength = 1;
}

message GetKeyRequestProto {
Expand Down
Expand Up @@ -421,6 +421,7 @@ ContainerCommandResponseProto handleCloseContainer(
ContainerCommandResponseProto handlePutKey(
ContainerCommandRequestProto request, KeyValueContainer kvContainer) {

long blockLength;
if (!request.hasPutKey()) {
LOG.debug("Malformed Put Key request. trace ID: {}",
request.getTraceID());
Expand All @@ -433,7 +434,7 @@ ContainerCommandResponseProto handlePutKey(
KeyData keyData = KeyData.getFromProtoBuf(
request.getPutKey().getKeyData());
long numBytes = keyData.getProtoBufMessage().toByteArray().length;
commitKey(keyData, kvContainer);
blockLength = commitKey(keyData, kvContainer);
metrics.incContainerBytesStats(Type.PutKey, numBytes);
} catch (StorageContainerException ex) {
return ContainerUtils.logAndReturnError(LOG, ex, request);
Expand All @@ -443,7 +444,7 @@ ContainerCommandResponseProto handlePutKey(
request);
}

return KeyUtils.getKeyResponseSuccess(request);
return KeyUtils.putKeyResponseSuccess(request, blockLength);
}

private void commitPendingKeys(KeyValueContainer kvContainer)
Expand All @@ -456,12 +457,13 @@ private void commitPendingKeys(KeyValueContainer kvContainer)
}
}

private void commitKey(KeyData keyData, KeyValueContainer kvContainer)
private long commitKey(KeyData keyData, KeyValueContainer kvContainer)
throws IOException {
Preconditions.checkNotNull(keyData);
keyManager.putKey(kvContainer, keyData);
long length = keyManager.putKey(kvContainer, keyData);
//update the open key Map in containerManager
this.openContainerBlockMap.removeFromKeyMap(keyData.getBlockID());
return length;
}
/**
* Handle Get Key operation. Calls KeyManager to process the request.
Expand Down Expand Up @@ -662,8 +664,12 @@ ContainerCommandResponseProto handleWriteChunk(
request.getWriteChunk().getStage() == Stage.COMBINED) {
metrics.incContainerBytesStats(Type.WriteChunk, request.getWriteChunk()
.getChunkData().getLen());
// the openContainerBlockMap should be updated only while writing data
// not during COMMIT_STAGE of handling write chunk request.
}

if (request.getWriteChunk().getStage() == Stage.COMMIT_DATA
|| request.getWriteChunk().getStage() == Stage.COMBINED) {
// the openContainerBlockMap should be updated only during
// COMMIT_STAGE of handling write chunk request.
openContainerBlockMap.addChunk(blockID, chunkInfoProto);
}
} catch (StorageContainerException ex) {
Expand Down
Expand Up @@ -27,6 +27,10 @@
.ContainerCommandResponseProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.GetKeyResponseProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.
GetCommittedBlockLengthResponseProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.
PutKeyResponseProto;
import org.apache.hadoop.hdds.scm.container.common.helpers
.StorageContainerException;
import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
Expand Down Expand Up @@ -122,6 +126,26 @@ public static KeyData getKeyData(byte[] bytes) throws IOException {
}
}

/**
* Returns putKey response success.
* @param msg - Request.
* @return Response.
*/
public static ContainerCommandResponseProto putKeyResponseSuccess(
ContainerCommandRequestProto msg, long blockLength) {
GetCommittedBlockLengthResponseProto.Builder
committedBlockLengthResponseBuilder =
getCommittedBlockLengthResponseBuilder(blockLength,
msg.getPutKey().getKeyData().getBlockID());
PutKeyResponseProto.Builder putKeyResponse =
PutKeyResponseProto.newBuilder();
putKeyResponse
.setCommittedBlockLength(committedBlockLengthResponseBuilder);
ContainerProtos.ContainerCommandResponseProto.Builder builder =
ContainerUtils.getSuccessResponseBuilder(msg);
builder.setPutKey(putKeyResponse);
return builder.build();
}
/**
* Returns successful keyResponse.
* @param msg - Request.
Expand Down Expand Up @@ -150,18 +174,26 @@ public static ContainerCommandResponseProto getKeyDataResponse(
* @param msg - Request.
* @return Response.
*/
public static ContainerProtos.ContainerCommandResponseProto
getBlockLengthResponse(ContainerProtos.
ContainerCommandRequestProto msg, long blockLength) {
public static ContainerCommandResponseProto getBlockLengthResponse(
ContainerCommandRequestProto msg, long blockLength) {
GetCommittedBlockLengthResponseProto.Builder
committedBlockLengthResponseBuilder =
getCommittedBlockLengthResponseBuilder(blockLength,
msg.getGetCommittedBlockLength().getBlockID());
ContainerProtos.ContainerCommandResponseProto.Builder builder =
ContainerUtils.getSuccessResponseBuilder(msg);
builder.setGetCommittedBlockLength(committedBlockLengthResponseBuilder);
return builder.build();
}

private static GetCommittedBlockLengthResponseProto.Builder
getCommittedBlockLengthResponseBuilder(
long blockLength, ContainerProtos.DatanodeBlockID blockID) {
ContainerProtos.GetCommittedBlockLengthResponseProto.Builder
getCommittedBlockLengthResponseBuilder = ContainerProtos.
GetCommittedBlockLengthResponseProto.newBuilder();
getCommittedBlockLengthResponseBuilder.setBlockLength(blockLength);
getCommittedBlockLengthResponseBuilder
.setBlockID(msg.getGetCommittedBlockLength().getBlockID());
ContainerProtos.ContainerCommandResponseProto.Builder builder =
ContainerUtils.getSuccessResponseBuilder(msg);
builder.setGetCommittedBlockLength(getCommittedBlockLengthResponseBuilder);
return builder.build();
getCommittedBlockLengthResponseBuilder.setBlockID(blockID);
return getCommittedBlockLengthResponseBuilder;
}
}
Expand Up @@ -67,9 +67,10 @@ public KeyManagerImpl(Configuration conf) {
*
* @param container - Container for which key need to be added.
* @param data - Key Data.
* @return length of the key.
* @throws IOException
*/
public void putKey(Container container, KeyData data) throws IOException {
public long putKey(Container container, KeyData data) throws IOException {
Preconditions.checkNotNull(data, "KeyData cannot be null for put " +
"operation.");
Preconditions.checkState(data.getContainerID() >= 0, "Container Id " +
Expand All @@ -87,6 +88,7 @@ public void putKey(Container container, KeyData data) throws IOException {

// Increment keycount here
container.getContainerData().incrKeyCount();
return data.getSize();
}

/**
Expand Down
Expand Up @@ -35,9 +35,10 @@ public interface KeyManager {
*
* @param container - Container for which key need to be added.
* @param data - Key Data.
* @return length of the Key.
* @throws IOException
*/
void putKey(Container container, KeyData data) throws IOException;
long putKey(Container container, KeyData data) throws IOException;

/**
* Gets an existing key.
Expand Down
Expand Up @@ -52,7 +52,7 @@
/**
* Test Container calls.
*/
public class TestCommittedBlockLengthAPI {
public class TestGetCommittedBlockLengthAndPutKey {

private static MiniOzoneCluster cluster;
private static OzoneConfiguration ozoneConfig;
Expand Down Expand Up @@ -213,4 +213,42 @@ public void testGetCommittedBlockLengthForOpenBlock() throws Exception {
Assert.assertTrue(response.getBlockLength() == 1024);
xceiverClientManager.releaseClient(client);
}

@Test
public void tesPutKeyResposne() throws Exception {
ContainerProtos.PutKeyResponseProto response;
String traceID = UUID.randomUUID().toString();
ContainerWithPipeline container = storageContainerLocationClient
.allocateContainer(xceiverClientManager.getType(),
HddsProtos.ReplicationFactor.ONE, containerOwner);
long containerID = container.getContainerInfo().getContainerID();
Pipeline pipeline = container.getPipeline();
XceiverClientSpi client =
xceiverClientManager.acquireClient(pipeline, containerID);
//create the container
ContainerProtocolCalls.createContainer(client, containerID, traceID);

BlockID blockID = ContainerTestHelper.getTestBlockID(containerID);
byte[] data =
RandomStringUtils.random(RandomUtils.nextInt(0, 1024)).getBytes();
ContainerProtos.ContainerCommandRequestProto writeChunkRequest =
ContainerTestHelper
.getWriteChunkRequest(container.getPipeline(), blockID,
data.length);
client.sendCommand(writeChunkRequest);
// Now, explicitly make a putKey request for the block.
ContainerProtos.ContainerCommandRequestProto putKeyRequest =
ContainerTestHelper
.getPutKeyRequest(pipeline, writeChunkRequest.getWriteChunk());
response = client.sendCommand(putKeyRequest).getPutKey();
// make sure the block ids in the request and response are same.
// This will also ensure that closing the container committed the block
// on the Datanodes.
Assert.assertEquals(BlockID
.getFromProtobuf(response.getCommittedBlockLength().getBlockID()),
blockID);
Assert.assertEquals(
response.getCommittedBlockLength().getBlockLength(), data.length);
xceiverClientManager.releaseClient(client);
}
}

0 comments on commit 398d895

Please sign in to comment.