Skip to content

Commit

Permalink
HDDS-1095. OzoneManager#openKey should do multiple block allocations …
Browse files Browse the repository at this point in the history
…in a single SCM rpc call. Contributed by Mukul Kumar Singh.
  • Loading branch information
mukul1987 committed Mar 12, 2019
1 parent d17e31e commit daf4660
Show file tree
Hide file tree
Showing 10 changed files with 117 additions and 70 deletions.
Expand Up @@ -48,14 +48,17 @@ public interface ScmBlockLocationProtocol extends Closeable {
* Asks SCM where a block should be allocated. SCM responds with the
* set of datanodes that should be used creating this block.
* @param size - size of the block.
* @param numBlocks - number of blocks.
* @param type - replication type of the blocks.
* @param factor - replication factor of the blocks.
* @param excludeList List of datanodes/containers to exclude during block
* allocation.
* @return allocated block accessing info (key, pipeline).
* @throws IOException
*/
AllocatedBlock allocateBlock(long size, ReplicationType type,
ReplicationFactor factor, String owner, ExcludeList excludeList)
throws IOException;
List<AllocatedBlock> allocateBlock(long size, int numBlocks,
ReplicationType type, ReplicationFactor factor, String owner,
ExcludeList excludeList) throws IOException;

/**
* Delete blocks for a set of object keys.
Expand Down
Expand Up @@ -25,6 +25,7 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdds.client.ContainerBlockID;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.AllocateBlockResponse;
import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.AllocateScmBlockRequestProto;
import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.AllocateScmBlockResponseProto;
import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.DeleteScmKeyBlocksRequestProto;
Expand Down Expand Up @@ -75,18 +76,23 @@ public ScmBlockLocationProtocolClientSideTranslatorPB(
* Asks SCM where a block should be allocated. SCM responds with the
* set of datanodes that should be used creating this block.
* @param size - size of the block.
* @param num - number of blocks.
* @param type - replication type of the blocks.
* @param factor - replication factor of the blocks.
* @param excludeList - exclude list while allocating blocks.
* @return allocated block accessing info (key, pipeline).
* @throws IOException
*/
@Override
public AllocatedBlock allocateBlock(long size,
public List<AllocatedBlock> allocateBlock(long size, int num,
HddsProtos.ReplicationType type, HddsProtos.ReplicationFactor factor,
String owner, ExcludeList excludeList) throws IOException {
Preconditions.checkArgument(size > 0, "block size must be greater than 0");

AllocateScmBlockRequestProto request =
AllocateScmBlockRequestProto.newBuilder()
.setSize(size)
.setNumBlocks(num)
.setType(type)
.setFactor(factor)
.setOwner(owner)
Expand All @@ -104,11 +110,17 @@ public AllocatedBlock allocateBlock(long size,
throw new IOException(response.hasErrorMessage() ?
response.getErrorMessage() : "Allocate block failed.");
}
AllocatedBlock.Builder builder = new AllocatedBlock.Builder()
.setContainerBlockID(
ContainerBlockID.getFromProtobuf(response.getContainerBlockID()))
.setPipeline(Pipeline.getFromProtobuf(response.getPipeline()));
return builder.build();

List<AllocatedBlock> blocks = new ArrayList<>(response.getBlocksCount());
for (AllocateBlockResponse resp : response.getBlocksList()) {
AllocatedBlock.Builder builder = new AllocatedBlock.Builder()
.setContainerBlockID(
ContainerBlockID.getFromProtobuf(resp.getContainerBlockID()))
.setPipeline(Pipeline.getFromProtobuf(resp.getPipeline()));
blocks.add(builder.build());
}

return blocks;
}

/**
Expand Down
Expand Up @@ -198,10 +198,10 @@ public final class OzoneConfigKeys {
public static final String OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT
= "300s"; // 300s for default

public static final String OZONE_KEY_PREALLOCATION_MAXSIZE =
"ozone.key.preallocation.maxsize";
public static final long OZONE_KEY_PREALLOCATION_MAXSIZE_DEFAULT
= 128 * OzoneConsts.MB;
public static final String OZONE_KEY_PREALLOCATION_BLOCKS_MAX =
"ozone.key.preallocation.max.blocks";
public static final int OZONE_KEY_PREALLOCATION_BLOCKS_MAX_DEFAULT
= 64;

public static final String OZONE_BLOCK_DELETING_LIMIT_PER_CONTAINER =
"ozone.block.deleting.limit.per.task";
Expand Down
Expand Up @@ -22,6 +22,8 @@
import io.opentracing.Scope;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos
.AllocateBlockResponse;
import org.apache.hadoop.hdds.scm.ScmInfo;
import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock;
import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
Expand Down Expand Up @@ -76,22 +78,30 @@ public AllocateScmBlockResponseProto allocateScmBlock(
try (Scope scope = TracingUtil
.importAndCreateScope("ScmBlockLocationProtocol.allocateBlock",
request.getTraceID())) {
AllocatedBlock allocatedBlock =
impl.allocateBlock(request.getSize(), request.getType(),
List<AllocatedBlock> allocatedBlocks =
impl.allocateBlock(request.getSize(),
request.getNumBlocks(), request.getType(),
request.getFactor(), request.getOwner(),
ExcludeList.getFromProtoBuf(request.getExcludeList()));
if (allocatedBlock != null) {
return
AllocateScmBlockResponseProto.newBuilder()
.setContainerBlockID(allocatedBlock.getBlockID().getProtobuf())
.setPipeline(allocatedBlock.getPipeline().getProtobufMessage())
.setErrorCode(AllocateScmBlockResponseProto.Error.success)
.build();
} else {
return AllocateScmBlockResponseProto.newBuilder()

AllocateScmBlockResponseProto.Builder builder =
AllocateScmBlockResponseProto.newBuilder();

if (allocatedBlocks.size() < request.getNumBlocks()) {
return builder
.setErrorCode(AllocateScmBlockResponseProto.Error.unknownFailure)
.build();
}

for (AllocatedBlock block : allocatedBlocks) {
builder.addBlocks(AllocateBlockResponse.newBuilder()
.setContainerBlockID(block.getBlockID().getProtobuf())
.setPipeline(block.getPipeline().getProtobufMessage()));
}

return builder
.setErrorCode(AllocateScmBlockResponseProto.Error.success)
.build();
} catch (IOException e) {
throw new ServiceException(e);
}
Expand Down
22 changes: 13 additions & 9 deletions hadoop-hdds/common/src/main/proto/ScmBlockLocationProtocol.proto
Expand Up @@ -38,12 +38,12 @@ import "hdds.proto";
*/
message AllocateScmBlockRequestProto {
required uint64 size = 1;
required ReplicationType type = 2;
required hadoop.hdds.ReplicationFactor factor = 3;
required string owner = 4;
optional string traceID = 5;
optional ExcludeListProto excludeList = 6;

required uint32 numBlocks = 2;
required ReplicationType type = 3;
required hadoop.hdds.ReplicationFactor factor = 4;
required string owner = 5;
optional string traceID = 6;
optional ExcludeListProto excludeList = 7;
}

/**
Expand Down Expand Up @@ -96,6 +96,11 @@ message DeleteScmBlockResult {
required BlockID blockID = 2;
}

message AllocateBlockResponse {
optional ContainerBlockID containerBlockID = 1;
optional hadoop.hdds.Pipeline pipeline = 2;
}

/**
* Reply from SCM indicating that the container.
*/
Expand All @@ -107,9 +112,8 @@ message AllocateScmBlockResponseProto {
unknownFailure = 4;
}
required Error errorCode = 1;
optional ContainerBlockID containerBlockID = 2;
optional hadoop.hdds.Pipeline pipeline = 3;
optional string errorMessage = 4;
optional string errorMessage = 2;
repeated AllocateBlockResponse blocks = 3;
}

/**
Expand Down
11 changes: 6 additions & 5 deletions hadoop-hdds/common/src/main/resources/ozone-default.xml
Expand Up @@ -1089,13 +1089,14 @@
</property>

<property>
<name>ozone.key.preallocation.maxsize</name>
<value>134217728</value>
<name>ozone.key.preallocation.max.blocks</name>
<value>64</value>
<tag>OZONE, OM, PERFORMANCE</tag>
<description>
When a new key write request is sent to OM, if a size is requested, at most
128MB of size is allocated at request time. If client needs more space for the
write, separate block allocation requests will be made.
While allocating blocks from OM, this configuration limits the maximum
number of blocks being allocated. This configuration ensures that the
allocated block response do not exceed rpc payload limit. If client needs
more space for the write, separate block allocation requests will be made.
</description>
</property>

Expand Down
Expand Up @@ -156,18 +156,25 @@ public void join() throws InterruptedException {
}

@Override
public AllocatedBlock allocateBlock(long size,
public List<AllocatedBlock> allocateBlock(long size, int num,
HddsProtos.ReplicationType type, HddsProtos.ReplicationFactor factor,
String owner, ExcludeList excludeList) throws IOException {
Map<String, String> auditMap = Maps.newHashMap();
auditMap.put("size", String.valueOf(size));
auditMap.put("type", type.name());
auditMap.put("factor", factor.name());
auditMap.put("owner", owner);
List<AllocatedBlock> blocks = new ArrayList<>(num);
boolean auditSuccess = true;
try {
return scm.getScmBlockManager()
.allocateBlock(size, type, factor, owner, excludeList);
for (int i = 0; i < num; i++) {
AllocatedBlock block = scm.getScmBlockManager()
.allocateBlock(size, type, factor, owner, excludeList);
if (block != null) {
blocks.add(block);
}
}
return blocks;
} catch (Exception ex) {
auditSuccess = false;
AUDIT.logWriteFailure(
Expand Down
Expand Up @@ -85,8 +85,8 @@
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL_DEFAULT;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_TIMEOUT;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_KEY_PREALLOCATION_MAXSIZE;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_KEY_PREALLOCATION_MAXSIZE_DEFAULT;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_KEY_PREALLOCATION_BLOCKS_MAX;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_KEY_PREALLOCATION_BLOCKS_MAX_DEFAULT;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_DEFAULT;
import static org.apache.hadoop.ozone.OzoneConsts.OM_MULTIPART_MIN_SIZE;
Expand All @@ -110,7 +110,7 @@ public class KeyManagerImpl implements KeyManager {
private final long scmBlockSize;
private final boolean useRatis;

private final long preallocateMax;
private final int preallocateBlocksMax;
private final String omId;
private final OzoneBlockTokenSecretManager secretManager;
private final boolean grpcBlockTokenEnabled;
Expand All @@ -136,9 +136,9 @@ public KeyManagerImpl(ScmBlockLocationProtocol scmBlockClient,
StorageUnit.BYTES);
this.useRatis = conf.getBoolean(DFS_CONTAINER_RATIS_ENABLED_KEY,
DFS_CONTAINER_RATIS_ENABLED_DEFAULT);
this.preallocateMax = conf.getLong(
OZONE_KEY_PREALLOCATION_MAXSIZE,
OZONE_KEY_PREALLOCATION_MAXSIZE_DEFAULT);
this.preallocateBlocksMax = conf.getInt(
OZONE_KEY_PREALLOCATION_BLOCKS_MAX,
OZONE_KEY_PREALLOCATION_BLOCKS_MAX_DEFAULT);
this.omId = omId;
start(conf);
this.secretManager = secretManager;
Expand Down Expand Up @@ -251,36 +251,45 @@ public OmKeyLocationInfo allocateBlock(OmKeyArgs args, long clientID,
return locationInfos.get(0);
}

/**
* This methods avoids multiple rpc calls to SCM by allocating multiple blocks
* in one rpc call.
* @param keyInfo - key info for key to be allocated.
* @param requestedSize requested length for allocation.
* @param excludeList exclude list while allocating blocks.
* @param requestedSize requested size to be allocated.
* @return
* @throws IOException
*/
private List<OmKeyLocationInfo> allocateBlock(OmKeyInfo keyInfo,
ExcludeList excludeList, long requestedSize) throws IOException {
int numBlocks = (int) ((requestedSize - 1) / scmBlockSize + 1);
int numBlocks = Math.min((int) ((requestedSize - 1) / scmBlockSize + 1),
preallocateBlocksMax);
List<OmKeyLocationInfo> locationInfos = new ArrayList<>(numBlocks);
while (requestedSize > 0) {
long allocateSize = Math.min(requestedSize, scmBlockSize);
AllocatedBlock allocatedBlock;
try {
allocatedBlock = scmBlockClient
.allocateBlock(allocateSize, keyInfo.getType(), keyInfo.getFactor(),
omId, excludeList);
} catch (SCMException ex) {
if (ex.getResult()
.equals(SCMException.ResultCodes.CHILL_MODE_EXCEPTION)) {
throw new OMException(ex.getMessage(), ResultCodes.SCM_IN_CHILL_MODE);
}
throw ex;
String remoteUser = getRemoteUser().getShortUserName();
List<AllocatedBlock> allocatedBlocks;
try {
allocatedBlocks = scmBlockClient
.allocateBlock(scmBlockSize, numBlocks, keyInfo.getType(),
keyInfo.getFactor(), omId, excludeList);
} catch (SCMException ex) {
if (ex.getResult()
.equals(SCMException.ResultCodes.CHILL_MODE_EXCEPTION)) {
throw new OMException(ex.getMessage(), ResultCodes.SCM_IN_CHILL_MODE);
}
throw ex;
}
for (AllocatedBlock allocatedBlock : allocatedBlocks) {
OmKeyLocationInfo.Builder builder = new OmKeyLocationInfo.Builder()
.setBlockID(new BlockID(allocatedBlock.getBlockID()))
.setLength(scmBlockSize)
.setOffset(0);
if (grpcBlockTokenEnabled) {
String remoteUser = getRemoteUser().getShortUserName();
builder.setToken(secretManager
.generateToken(remoteUser, allocatedBlock.getBlockID().toString(),
getAclForUser(remoteUser), scmBlockSize));
}
locationInfos.add(builder.build());
requestedSize -= allocateSize;
}
return locationInfos;
}
Expand Down Expand Up @@ -339,7 +348,6 @@ public OpenKeySession openKey(OmKeyArgs args) throws IOException {
ReplicationFactor factor = args.getFactor();
ReplicationType type = args.getType();
long currentTime = Time.monotonicNowNanos();
long requestedSize = Math.min(preallocateMax, args.getDataSize());
OmKeyInfo keyInfo;
String openKey;
long openVersion;
Expand Down Expand Up @@ -457,9 +465,9 @@ public OpenKeySession openKey(OmKeyArgs args) throws IOException {
// the point, if client needs more blocks, client can always call
// allocateBlock. But if requested size is not 0, OM will preallocate
// some blocks and piggyback to client, to save RPC calls.
if (requestedSize > 0) {
if (args.getDataSize() > 0) {
List<OmKeyLocationInfo> locationInfos =
allocateBlock(keyInfo, new ExcludeList(), requestedSize);
allocateBlock(keyInfo, new ExcludeList(), args.getDataSize());
keyInfo.appendNewBlocks(locationInfos);
}
metadataManager.getOpenKeyTable().put(openKey, keyInfo);
Expand Down
Expand Up @@ -40,6 +40,7 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.UUID;

Expand Down Expand Up @@ -114,7 +115,7 @@ public ScmBlockLocationTestIngClient(String clusterID, String scmId,
* @throws IOException
*/
@Override
public AllocatedBlock allocateBlock(long size,
public List<AllocatedBlock> allocateBlock(long size, int num,
HddsProtos.ReplicationType type, HddsProtos.ReplicationFactor factor,
String owner, ExcludeList excludeList) throws IOException {
DatanodeDetails datanodeDetails = TestUtils.randomDatanodeDetails();
Expand All @@ -125,7 +126,7 @@ public AllocatedBlock allocateBlock(long size,
new AllocatedBlock.Builder()
.setContainerBlockID(new ContainerBlockID(containerID, localID))
.setPipeline(pipeline);
return abb.build();
return Collections.singletonList(abb.build());
}

private Pipeline createPipeline(DatanodeDetails datanode) {
Expand Down
Expand Up @@ -86,13 +86,14 @@ public static void setUp() throws Exception {
scmBlockSize = (long) conf
.getStorageSize(OZONE_SCM_BLOCK_SIZE, OZONE_SCM_BLOCK_SIZE_DEFAULT,
StorageUnit.BYTES);
conf.setLong(OZONE_KEY_PREALLOCATION_MAXSIZE, scmBlockSize * 10);
conf.setLong(OZONE_KEY_PREALLOCATION_BLOCKS_MAX, 10);

keyManager =
new KeyManagerImpl(scm.getBlockProtocolServer(), metadataManager, conf,
"om1", null);
Mockito.when(mockScmBlockLocationProtocol
.allocateBlock(Mockito.anyLong(), Mockito.any(ReplicationType.class),
.allocateBlock(Mockito.anyLong(), Mockito.anyInt(),
Mockito.any(ReplicationType.class),
Mockito.any(ReplicationFactor.class), Mockito.anyString(),
Mockito.any(ExcludeList.class))).thenThrow(
new SCMException("ChillModePrecheck failed for allocateBlock",
Expand Down

0 comments on commit daf4660

Please sign in to comment.