Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.PutSmallFileResponseProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ReadChunkResponseProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ReadContainerResponseProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ListBlockResponseProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type;
import org.apache.hadoop.ozone.common.ChunkBuffer;
Expand Down Expand Up @@ -165,6 +166,18 @@ public static ContainerCommandResponseProto getBlockDataResponse(
.build();
}

public static ContainerCommandResponseProto getListBlockResponse(
ContainerCommandRequestProto msg, List<BlockData> data) {

ListBlockResponseProto.Builder builder =
ListBlockResponseProto.newBuilder();
for(int i = 0; i < data.size(); i++) {
builder.addBlockData(data.get(i));
}
return getSuccessResponseBuilder(msg)
.setListBlock(builder)
.build();
}
/**
* Returns successful getCommittedBlockLength Response.
* @param msg - Request.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@
import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getBlockLengthResponse;
import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getBlockResponseSuccess;
import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getGetSmallFileResponseSuccess;
import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getListBlockResponse;
import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getPutFileResponseSuccess;
import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getReadChunkResponse;
import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getReadContainerResponse;
Expand Down Expand Up @@ -214,7 +215,7 @@ static ContainerCommandResponseProto dispatchRequest(KeyValueHandler handler,
case DeleteBlock:
return handler.handleDeleteBlock(request, kvContainer);
case ListBlock:
return handler.handleUnsupportedOp(request);
return handler.handleListBlock(request, kvContainer);
case ReadChunk:
return handler.handleReadChunk(request, kvContainer, dispatcherContext);
case DeleteChunk:
Expand Down Expand Up @@ -545,6 +546,43 @@ ContainerCommandResponseProto handleGetCommittedBlockLength(
return getBlockLengthResponse(request, blockLength);
}

/**
* Handle List Block operation. Calls BlockManager to process the request.
*/
ContainerCommandResponseProto handleListBlock(
ContainerCommandRequestProto request, KeyValueContainer kvContainer) {

if (!request.hasListBlock()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Malformed list block request. trace ID: {}",
request.getTraceID());
}
return malformedRequest(request);
}

List<BlockData> responseData;
List<ContainerProtos.BlockData> returnData = new ArrayList();
try {
int count = request.getListBlock().getCount();
long startLocalId = -1;
if (request.getListBlock().hasStartLocalID()) {
startLocalId = request.getListBlock().getStartLocalID();
}
responseData = blockManager.listBlock(kvContainer, startLocalId, count);
for (int i = 0; i < responseData.size(); i++) {
returnData.add(responseData.get(i).getProtoBufMessage());
}
} catch (StorageContainerException ex) {
return ContainerUtils.logAndReturnError(LOG, ex, request);
} catch (IOException ex) {
return ContainerUtils.logAndReturnError(LOG,
new StorageContainerException("List blocks failed", ex, IO_EXCEPTION),
request);
}

return getListBlockResponse(request, returnData);
}

/**
* Handle Delete Block operation. Calls BlockManager to process the request.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -319,8 +319,8 @@ public void deleteBlock(Container container, BlockID blockID) throws
public List<BlockData> listBlock(Container container, long startLocalID, int
count) throws IOException {
Preconditions.checkNotNull(container, "container cannot be null");
Preconditions.checkState(startLocalID >= 0, "startLocal ID cannot be " +
"negative");
Preconditions.checkState(startLocalID >= 0 || startLocalID == -1,
"startLocal ID cannot be negative");
Preconditions.checkArgument(count > 0,
"Count must be a positive number.");
container.readLock();
Expand All @@ -332,11 +332,11 @@ public List<BlockData> listBlock(Container container, long startLocalID, int
result = new ArrayList<>();
List<? extends Table.KeyValue<String, BlockData>> range =
db.getStore().getBlockDataTable()
.getSequentialRangeKVs(Long.toString(startLocalID), count,
.getSequentialRangeKVs(startLocalID == -1 ? null :
Long.toString(startLocalID), count,
MetadataKeyFilters.getUnprefixedKeyFilter());
for (Table.KeyValue<String, BlockData> entry : range) {
BlockData data = new BlockData(entry.getValue().getBlockID());
result.add(data);
result.add(entry.getValue());
}
return result;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,25 @@ public void testCreateContainerWithWriteChunk() throws IOException {
response.getReadChunk().getDataBuffers().getBuffersList());
Assert.assertEquals(writeChunkRequest.getWriteChunk().getData(),
responseData);
// put block
ContainerCommandRequestProto putBlockRequest =
getPutBlockRequest(writeChunkRequest);
response = hddsDispatcher.dispatch(putBlockRequest, null);
Assert.assertEquals(ContainerProtos.Result.SUCCESS, response.getResult());
// send list block request
ContainerCommandRequestProto listBlockRequest =
getListBlockRequest(writeChunkRequest);
response = hddsDispatcher.dispatch(listBlockRequest, null);
Assert.assertEquals(ContainerProtos.Result.SUCCESS, response.getResult());
Assert.assertEquals(1, response.getListBlock().getBlockDataList().size());
for (ContainerProtos.BlockData blockData :
response.getListBlock().getBlockDataList()) {
Assert.assertEquals(writeChunkRequest.getWriteChunk().getBlockID(),
blockData.getBlockID());
Assert.assertEquals(writeChunkRequest.getWriteChunk().getChunkData()
.getLen(), blockData.getSize());
Assert.assertEquals(1, blockData.getChunksCount());
}
} finally {
ContainerMetrics.remove();
FileUtils.deleteDirectory(new File(testDir));
Expand Down Expand Up @@ -380,4 +399,31 @@ private ContainerCommandRequestProto getReadChunkRequest(
.build();
}

private ContainerCommandRequestProto getPutBlockRequest(
ContainerCommandRequestProto writeChunkRequest) {
ContainerProtos.BlockData.Builder block =
ContainerProtos.BlockData.newBuilder()
.setSize(writeChunkRequest.getWriteChunk().getChunkData().getLen())
.setBlockID(writeChunkRequest.getWriteChunk().getBlockID())
.addChunks(writeChunkRequest.getWriteChunk().getChunkData());
return ContainerCommandRequestProto.newBuilder()
.setContainerID(writeChunkRequest.getContainerID())
.setCmdType(ContainerProtos.Type.PutBlock)
.setDatanodeUuid(writeChunkRequest.getDatanodeUuid())
.setPutBlock(ContainerProtos.PutBlockRequestProto.newBuilder()
.setBlockData(block.build())
.build())
.build();
}
Comment on lines +402 to +417
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please reuse existing code from ContainerTestHelper:

/**
* Returns the PutBlockRequest for test purpose.
* @param pipeline - pipeline.
* @param writeRequest - Write Chunk Request.
* @return - Request
*/
public static ContainerCommandRequestProto getPutBlockRequest(
Pipeline pipeline, ContainerProtos.WriteChunkRequestProto writeRequest)
throws IOException {
return getPutBlockRequest(pipeline, null, writeRequest);
}
/**
* Returns the PutBlockRequest for test purpose.
* @param pipeline - pipeline.
* @param token - token.
* @param writeRequest - Write Chunk Request.
* @return - Request
*/
public static ContainerCommandRequestProto getPutBlockRequest(
Pipeline pipeline, String token,
ContainerProtos.WriteChunkRequestProto writeRequest)
throws IOException {
Builder builder = newPutBlockRequestBuilder(pipeline, writeRequest);
if (!Strings.isNullOrEmpty(token)) {
builder.setEncodedToken(token);
}
return builder.build();
}
public static Builder newPutBlockRequestBuilder(Pipeline pipeline,
ContainerProtos.WriteChunkRequestProtoOrBuilder writeRequest)
throws IOException {
LOG.trace("putBlock: {} to pipeline={}",
writeRequest.getBlockID(), pipeline);
ContainerProtos.PutBlockRequestProto.Builder putRequest =
ContainerProtos.PutBlockRequestProto.newBuilder();
BlockData blockData = new BlockData(
BlockID.getFromProtobuf(writeRequest.getBlockID()));
List<ContainerProtos.ChunkInfo> newList = new LinkedList<>();
newList.add(writeRequest.getChunkData());
blockData.setChunks(newList);
blockData.setBlockCommitSequenceId(0);
putRequest.setBlockData(blockData.getProtoBufMessage());
Builder request =
ContainerCommandRequestProto.newBuilder();
request.setCmdType(ContainerProtos.Type.PutBlock);
request.setContainerID(blockData.getContainerID());
request.setPutBlock(putRequest);
request.setDatanodeUuid(pipeline.getFirstNode().getUuidString());
return request;
}


private ContainerCommandRequestProto getListBlockRequest(
ContainerCommandRequestProto writeChunkRequest) {
return ContainerCommandRequestProto.newBuilder()
.setContainerID(writeChunkRequest.getContainerID())
.setCmdType(ContainerProtos.Type.ListBlock)
.setDatanodeUuid(writeChunkRequest.getDatanodeUuid())
.setListBlock(ContainerProtos.ListBlockRequestProto.newBuilder()
.setCount(10).build())
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ public void setup() throws StorageContainerException {
/**
* Test that Handler handles different command types correctly.
*/
@Test
public void testHandlerCommandHandling() throws Exception {
Mockito.reset(handler);
// Test Create Container Request handling
Expand Down Expand Up @@ -208,8 +209,8 @@ public void testHandlerCommandHandling() throws Exception {
getDummyCommandRequestProto(ContainerProtos.Type.ListBlock);
KeyValueHandler
.dispatchRequest(handler, listBlockRequest, container, context);
Mockito.verify(handler, times(2)).handleUnsupportedOp(
any(ContainerCommandRequestProto.class));
Mockito.verify(handler, times(1)).handleListBlock(
any(ContainerCommandRequestProto.class), any());

// Test Read Chunk Request handling
ContainerCommandRequestProto readChunkRequest =
Expand Down Expand Up @@ -240,7 +241,7 @@ public void testHandlerCommandHandling() throws Exception {
getDummyCommandRequestProto(ContainerProtos.Type.ListChunk);
KeyValueHandler
.dispatchRequest(handler, listChunkRequest, container, context);
Mockito.verify(handler, times(3)).handleUnsupportedOp(
Mockito.verify(handler, times(2)).handleUnsupportedOp(
any(ContainerCommandRequestProto.class));

// Test Put Small File Request handling
Expand Down