Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,17 @@
* contains a Pipeline and the key.
*/
public final class AllocatedBlock {
private Pipeline pipeline;
private ContainerBlockID containerBlockID;
private final Pipeline pipeline;
private final ContainerBlockID containerBlockID;
private final long size;

/**
* Builder for AllocatedBlock.
*/
public static class Builder {
private Pipeline pipeline;
private ContainerBlockID containerBlockID;
private long size;

public Builder setPipeline(Pipeline p) {
this.pipeline = p;
Expand All @@ -46,14 +48,21 @@ public Builder setContainerBlockID(ContainerBlockID blockId) {
return this;
}

public Builder setSize(long s) {
this.size = s;
return this;
}

public AllocatedBlock build() {
return new AllocatedBlock(pipeline, containerBlockID);
return new AllocatedBlock(pipeline, containerBlockID, size);
}
}

private AllocatedBlock(Pipeline pipeline, ContainerBlockID containerBlockID) {
private AllocatedBlock(Pipeline pipeline, ContainerBlockID containerBlockID,
long size) {
this.pipeline = pipeline;
this.containerBlockID = containerBlockID;
this.size = size;
}

public Pipeline getPipeline() {
Expand All @@ -63,4 +72,12 @@ public Pipeline getPipeline() {
public ContainerBlockID getBlockID() {
return containerBlockID;
}

public boolean hasSize() {
return size > 0;
}

public long getSize() {
return size;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hdds.scm.protocol;

import org.apache.hadoop.hdds.client.ECReplicationConfig;
import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.scm.AddSCMRequest;
Expand Down Expand Up @@ -65,8 +66,8 @@ public interface ScmBlockLocationProtocol extends Closeable {
default List<AllocatedBlock> allocateBlock(long size, int numBlocks,
ReplicationType type, ReplicationFactor factor, String owner,
ExcludeList excludeList) throws IOException, TimeoutException {
return allocateBlock(size, numBlocks, ReplicationConfig
.fromProtoTypeAndFactor(type, factor), owner, excludeList);
return allocateBlock(size * numBlocks, ReplicationConfig
.fromProtoTypeAndFactor(type, factor), owner, excludeList, size);
}

/**
Expand All @@ -83,9 +84,53 @@ default List<AllocatedBlock> allocateBlock(long size, int numBlocks,
* @return allocated block accessing info (key, pipeline).
* @throws IOException
*/
List<AllocatedBlock> allocateBlock(long size, int numBlocks,
@Deprecated
default List<AllocatedBlock> allocateBlock(long size, int numBlocks,
ReplicationConfig replicationConfig, String owner,
ExcludeList excludeList) throws IOException {
int numData = replicationConfig instanceof ECReplicationConfig ?
((ECReplicationConfig) replicationConfig).getData() : 1;
return allocateBlock(numBlocks * size * numData, replicationConfig, owner,
excludeList, size);
}

/**
* Asks SCM to allocate blocks of specified replication config.
* SCM responds with a list of blocks, including the size and
* location of each block. SCM will decide the max block size.
*
* @param requestedSize - total size requested.
* @param replicationConfig - replicationConfiguration
* @param owner - service owner of the new block
* @param excludeList List of datanodes/containers to exclude during
* block allocation.
* @return allocated block accessing info (key, pipeline).
* @throws IOException
*/
default List<AllocatedBlock> allocateBlock(long requestedSize,
ReplicationConfig replicationConfig, String owner,
ExcludeList excludeList) throws IOException {
return allocateBlock(requestedSize, replicationConfig,
owner, excludeList, 0);
}

/**
* Asks SCM to allocate blocks of specified replication config.
* SCM responds with a list of blocks, including the size and
* location of each block. Client can specify the max block size.
*
* @param requestedSize - total size requested.
* @param replicationConfig - replicationConfiguration
* @param owner - service owner of the new block
* @param excludeList List of datanodes/containers to exclude during
* block allocation.
* @param blockSize - client specified block size, 0 for default.
* @return allocated block accessing info (key, pipeline).
* @throws IOException
*/
List<AllocatedBlock> allocateBlock(long requestedSize,
ReplicationConfig replicationConfig, String owner,
ExcludeList excludeList) throws IOException;
ExcludeList excludeList, long blockSize) throws IOException;

/**
* Delete blocks for a set of object keys.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,25 +138,26 @@ private SCMBlockLocationResponse handleError(SCMBlockLocationResponse resp)
* Asks SCM where a block should be allocated. SCM responds with the
* set of datanodes that should be used creating this block.
*
* @param size - size of the block.
* @param num - number of blocks.
* @param requestedSize - total size requested.
* @param replicationConfig - replication configuration of the blocks.
* @param excludeList - exclude list while allocating blocks.
* @param blockSize - client specified block size, 0 for default.
* @return allocated block accessing info (key, pipeline).
* @throws IOException
*/
@Override
public List<AllocatedBlock> allocateBlock(
long size, int num,
ReplicationConfig replicationConfig,
String owner, ExcludeList excludeList
) throws IOException {
Preconditions.checkArgument(size > 0, "block size must be greater than 0");
long requestedSize, ReplicationConfig replicationConfig,
String owner, ExcludeList excludeList, long blockSize)
throws IOException {
Preconditions.checkArgument(requestedSize > 0,
"Requested size must be greater than 0");

final AllocateScmBlockRequestProto.Builder requestBuilder =
AllocateScmBlockRequestProto.newBuilder()
.setSize(size)
.setNumBlocks(num)
.setSize(blockSize)
.setNumBlocks(0) // deprecated required field
.setRequestedSize(requestedSize)
.setType(replicationConfig.getReplicationType())
.setOwner(owner)
.setExcludeList(excludeList.getProtoBuf());
Expand Down Expand Up @@ -201,7 +202,8 @@ public List<AllocatedBlock> allocateBlock(
AllocatedBlock.Builder builder = new AllocatedBlock.Builder()
.setContainerBlockID(
ContainerBlockID.getFromProtobuf(resp.getContainerBlockID()))
.setPipeline(Pipeline.getFromProtobuf(resp.getPipeline()));
.setPipeline(Pipeline.getFromProtobuf(resp.getPipeline()))
.setSize(resp.getSize());
blocks.add(builder.build());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,16 +136,19 @@ enum Status {
* Request send to SCM asking allocate block of specified size.
*/
message AllocateScmBlockRequestProto {
required uint64 size = 1;
required uint32 numBlocks = 2;
required uint64 size = 1; // client specified block size, 0 means None
required uint32 numBlocks = 2 [deprecated = true];
required ReplicationType type = 3;
optional hadoop.hdds.ReplicationFactor factor = 4;
required string owner = 5;
optional ExcludeListProto excludeList = 7;

//used for EC replicaiton instead of the replication factor
//used for EC replication instead of the replication factor
optional hadoop.hdds.ECReplicationConfig ecReplicationConfig = 8;

// If requestedSize is greater than 0, size and numBlocks can be ignored.
// It is up to SCM to decide how many blocks to allocate.
optional uint64 requestedSize = 9;
}

/**
Expand Down Expand Up @@ -199,6 +202,7 @@ message DeleteScmBlockResult {
message AllocateBlockResponse {
optional ContainerBlockID containerBlockID = 1;
optional hadoop.hdds.Pipeline pipeline = 2;
optional uint64 size = 3;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ public AllocatedBlock allocateBlock(final long size,
size, replicationConfig, owner, excludeList);

if (containerInfo != null) {
return newBlock(containerInfo);
return newBlock(containerInfo, size);
}
// we have tried all strategies we know and but somehow we are not able
// to get a container for this block. Log that info and return a null.
Expand All @@ -192,9 +192,10 @@ public AllocatedBlock allocateBlock(final long size,
* newBlock - returns a new block assigned to a container.
*
* @param containerInfo - Container Info.
* @param size - Block Size.
* @return AllocatedBlock
*/
private AllocatedBlock newBlock(ContainerInfo containerInfo)
private AllocatedBlock newBlock(ContainerInfo containerInfo, long size)
throws NotLeaderException, TimeoutException {
try {
final Pipeline pipeline = pipelineManager
Expand All @@ -203,10 +204,11 @@ private AllocatedBlock newBlock(ContainerInfo containerInfo)
long containerID = containerInfo.getContainerID();
AllocatedBlock.Builder abb = new AllocatedBlock.Builder()
.setContainerBlockID(new ContainerBlockID(containerID, localID))
.setPipeline(pipeline);
.setPipeline(pipeline)
.setSize(size);
if (LOG.isTraceEnabled()) {
LOG.trace("New block allocated : {} Container ID: {}", localID,
containerID);
LOG.trace("New block allocated : {} Container ID: {} Block Size: {}",
localID, containerID, size);
}
pipelineManager.incNumBlocksAllocatedMetric(pipeline.getId());
return abb.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,22 +188,34 @@ private Status exceptionToResponseStatus(IOException ex) {
public AllocateScmBlockResponseProto allocateScmBlock(
AllocateScmBlockRequestProto request, int clientVersion)
throws IOException {

final int numData = request.hasEcReplicationConfig() ?
request.getEcReplicationConfig().getData() : 1;

final long requestedSize = request.hasRequestedSize() ?
request.getRequestedSize() :
request.getNumBlocks() * request.getSize() * numData;

List<AllocatedBlock> allocatedBlocks =
impl.allocateBlock(request.getSize(),
request.getNumBlocks(),
impl.allocateBlock(
requestedSize,
ReplicationConfig.fromProto(
request.getType(),
request.getFactor(),
request.getEcReplicationConfig()),
request.getOwner(),
ExcludeList.getFromProtoBuf(request.getExcludeList()));
request.getOwner(),
ExcludeList.getFromProtoBuf(request.getExcludeList()),
request.getSize());

AllocateScmBlockResponseProto.Builder builder =
AllocateScmBlockResponseProto.newBuilder();

if (allocatedBlocks.size() < request.getNumBlocks()) {
throw new SCMException("Allocated " + allocatedBlocks.size() +
" blocks. Requested " + request.getNumBlocks() + " blocks",
final long allocatedSize = numData * allocatedBlocks.stream()
.mapToLong(AllocatedBlock::getSize)
.sum();
if (allocatedSize < requestedSize) {
throw new SCMException("Allocated " + allocatedSize +
" bytes. Requested " + requestedSize + " bytes",
SCMException.ResultCodes.FAILED_TO_ALLOCATE_ENOUGH_BLOCKS);
}
for (AllocatedBlock block : allocatedBlocks) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,13 @@
import java.util.Map;
import java.util.concurrent.TimeoutException;

import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.client.ECReplicationConfig;
import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.conf.StorageUnit;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos;
import org.apache.hadoop.hdds.scm.AddSCMRequest;
Expand Down Expand Up @@ -70,6 +73,9 @@
import static org.apache.hadoop.hdds.scm.server.StorageContainerManager.startRpcServer;
import static org.apache.hadoop.hdds.server.ServerUtils.getRemoteUserName;
import static org.apache.hadoop.hdds.server.ServerUtils.updateRPCListenAddress;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_DEFAULT;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -91,6 +97,7 @@ public class SCMBlockProtocolServer implements
private final InetSocketAddress blockRpcAddress;
private final ProtocolMessageMetrics<ProtocolMessageEnum>
protocolMessageMetrics;
private final long scmBlockSize;

/**
* The RPC server that listens to requests from block service clients.
Expand All @@ -103,6 +110,9 @@ public SCMBlockProtocolServer(OzoneConfiguration conf,
conf.getInt(OZONE_SCM_HANDLER_COUNT_KEY,
OZONE_SCM_HANDLER_COUNT_DEFAULT);

this.scmBlockSize = (long) conf.getStorageSize(OZONE_SCM_BLOCK_SIZE,
OZONE_SCM_BLOCK_SIZE_DEFAULT, StorageUnit.BYTES);

RPC.setProtocolEngine(conf, ScmBlockLocationProtocolPB.class,
ProtobufRpcEngine.class);

Expand Down Expand Up @@ -171,27 +181,41 @@ public void join() throws InterruptedException {
getBlockRpcServer().join();
}

@VisibleForTesting
public long getScmBlockSize() {
return scmBlockSize;
}

@Override
public List<AllocatedBlock> allocateBlock(
long size, int num,
ReplicationConfig replicationConfig,
String owner, ExcludeList excludeList
long requestedSize, ReplicationConfig replicationConfig,
String owner, ExcludeList excludeList, long blockSize
) throws IOException {
Map<String, String> auditMap = Maps.newHashMap();
auditMap.put("size", String.valueOf(size));
auditMap.put("num", String.valueOf(num));
auditMap.put("requestedSize", String.valueOf(requestedSize));
auditMap.put("blockSize", String.valueOf(blockSize));
auditMap.put("replication", replicationConfig.toString());
auditMap.put("owner", owner);
List<AllocatedBlock> blocks = new ArrayList<>(num);

if (blockSize <= 0) {
blockSize = scmBlockSize;
}
int numData = replicationConfig instanceof ECReplicationConfig ?
((ECReplicationConfig) replicationConfig).getData() : 1;
int num = (int) ((requestedSize - 1) / (blockSize * numData) + 1);

if (LOG.isDebugEnabled()) {
LOG.debug("Allocating {} blocks of size {}, with {}",
num, size, excludeList);
LOG.debug("Requested Size {} replicationConfig {}," +
"allocating {} blocks (or block groups) of size {}, with {}",
requestedSize, replicationConfig,
num, numData * blockSize, excludeList);
}

List<AllocatedBlock> blocks = new ArrayList<>(num);
try {
for (int i = 0; i < num; i++) {
AllocatedBlock block = scm.getScmBlockManager()
.allocateBlock(size, replicationConfig, owner, excludeList);
.allocateBlock(blockSize, replicationConfig, owner, excludeList);
if (block != null) {
blocks.add(block);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1760,6 +1760,11 @@ public BlockManager getScmBlockManager() {
return scmBlockManager;
}

@VisibleForTesting
public void setScmBlockManager(BlockManager blockManager) {
this.scmBlockManager = blockManager;
}

@VisibleForTesting
public SCMSafeModeManager getScmSafeModeManager() {
return scmSafeModeManager;
Expand Down
Loading