Skip to content

Commit

Permalink
HDFS-6133. Add a feature for replica pinning so that a pinned replica…
Browse files Browse the repository at this point in the history
… will not be moved by Balancer/Mover. Contributed by zhaoyunjiong
  • Loading branch information
Tsz-Wo Nicholas Sze committed Feb 11, 2015
1 parent 50625e6 commit 085b1e2
Show file tree
Hide file tree
Showing 21 changed files with 271 additions and 36 deletions.
3 changes: 3 additions & 0 deletions hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
Expand Up @@ -332,6 +332,9 @@ Release 2.7.0 - UNRELEASED
HDFS-7720. Quota by Storage Type API, tools and ClientNameNode Protocol
changes. (Xiaoyu Yao via Arpit Agarwal)

HDFS-6133. Add a feature for replica pinning so that a pinned replica
will not be moved by Balancer/Mover. (zhaoyunjiong via szetszwo)

IMPROVEMENTS

HDFS-7055. Add tracing to DFSInputStream (cmccabe)
Expand Down
Expand Up @@ -778,4 +778,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
// 10 days
public static final long DFS_CLIENT_KEY_PROVIDER_CACHE_EXPIRY_DEFAULT =
TimeUnit.DAYS.toMillis(10);
public static final String DFS_DATANODE_BLOCK_PINNING_ENABLED =
"dfs.datanode.block-pinning.enabled";
public static final boolean DFS_DATANODE_BLOCK_PINNING_ENABLED_DEFAULT =
false;
}
Expand Up @@ -1443,11 +1443,13 @@ private boolean createBlockOutputStream(DatanodeInfo[] nodes,
ExtendedBlock blockCopy = new ExtendedBlock(block);
blockCopy.setNumBytes(blockSize);

boolean[] targetPinnings = getPinnings(nodes);
// send the request
new Sender(out).writeBlock(blockCopy, nodeStorageTypes[0], accessToken,
dfsClient.clientName, nodes, nodeStorageTypes, null, bcs,
nodes.length, block.getNumBytes(), bytesSent, newGS,
checksum4WriteBlock, cachingStrategy.get(), isLazyPersistFile);
checksum4WriteBlock, cachingStrategy.get(), isLazyPersistFile,
(targetPinnings == null ? false : targetPinnings[0]), targetPinnings);

// receive ack for connect
BlockOpResponseProto resp = BlockOpResponseProto.parseFrom(
Expand Down Expand Up @@ -1535,6 +1537,24 @@ private boolean createBlockOutputStream(DatanodeInfo[] nodes,
}
}

private boolean[] getPinnings(DatanodeInfo[] nodes) {
if (favoredNodes == null) {
return null;
} else {
boolean[] pinnings = new boolean[nodes.length];
for (int i = 0; i < nodes.length; i++) {
pinnings[i] = false;
for (int j = 0; j < favoredNodes.length; j++) {
if (nodes[i].getXferAddrWithHostname().equals(favoredNodes[j])) {
pinnings[i] = true;
break;
}
}
}
return pinnings;
}
}

private LocatedBlock locateFollowingBlock(long start,
DatanodeInfo[] excludedNodes) throws IOException {
int retries = dfsClient.getConf().nBlockWriteLocateFollowingRetry;
Expand Down
Expand Up @@ -352,9 +352,10 @@ public FSDataOutputStream create(Path f, FsPermission permission,
* Progressable)} with the addition of favoredNodes that is a hint to
* where the namenode should place the file blocks.
* The favored nodes hint is not persisted in HDFS. Hence it may be honored
* at the creation time only. HDFS could move the blocks during balancing or
* replication, to move the blocks from favored nodes. A value of null means
* no favored nodes for this create
* at the creation time only. And with favored nodes, blocks will be pinned
* on the datanodes to prevent balancing move the block. HDFS could move the
* blocks during replication, to move the blocks from favored nodes. A value
* of null means no favored nodes for this create
*/
public HdfsDataOutputStream create(final Path f,
final FsPermission permission, final boolean overwrite,
Expand Down
Expand Up @@ -92,6 +92,8 @@ public void readBlock(final ExtendedBlock blk,
* @param minBytesRcvd minimum number of bytes received.
* @param maxBytesRcvd maximum number of bytes received.
* @param latestGenerationStamp the latest generation stamp of the block.
* @param pinning whether to pin the block, so Balancer won't move it.
* @param targetPinnings whether to pin the block on target datanode
*/
public void writeBlock(final ExtendedBlock blk,
final StorageType storageType,
Expand All @@ -107,7 +109,9 @@ public void writeBlock(final ExtendedBlock blk,
final long latestGenerationStamp,
final DataChecksum requestedChecksum,
final CachingStrategy cachingStrategy,
final boolean allowLazyPersist) throws IOException;
final boolean allowLazyPersist,
final boolean pinning,
final boolean[] targetPinnings) throws IOException;
/**
* Transfer a block to another datanode.
* The block stage must be
Expand Down
Expand Up @@ -149,7 +149,9 @@ private void opWriteBlock(DataInputStream in) throws IOException {
(proto.hasCachingStrategy() ?
getCachingStrategy(proto.getCachingStrategy()) :
CachingStrategy.newDefaultStrategy()),
(proto.hasAllowLazyPersist() ? proto.getAllowLazyPersist() : false));
(proto.hasAllowLazyPersist() ? proto.getAllowLazyPersist() : false),
(proto.hasPinning() ? proto.getPinning(): false),
(PBHelper.convertBooleanList(proto.getTargetPinningsList())));
} finally {
if (traceScope != null) traceScope.close();
}
Expand Down
Expand Up @@ -129,7 +129,9 @@ public void writeBlock(final ExtendedBlock blk,
final long latestGenerationStamp,
DataChecksum requestedChecksum,
final CachingStrategy cachingStrategy,
final boolean allowLazyPersist) throws IOException {
final boolean allowLazyPersist,
final boolean pinning,
final boolean[] targetPinnings) throws IOException {
ClientOperationHeaderProto header = DataTransferProtoUtil.buildClientHeader(
blk, clientName, blockToken);

Expand All @@ -148,7 +150,9 @@ public void writeBlock(final ExtendedBlock blk,
.setLatestGenerationStamp(latestGenerationStamp)
.setRequestedChecksum(checksumProto)
.setCachingStrategy(getCachingStrategy(cachingStrategy))
.setAllowLazyPersist(allowLazyPersist);
.setAllowLazyPersist(allowLazyPersist)
.setPinning(pinning)
.addAllTargetPinnings(PBHelper.convert(targetPinnings, 1));

if (source != null) {
proto.setSource(PBHelper.convertDatanodeInfo(source));
Expand Down
Expand Up @@ -2960,4 +2960,25 @@ public static FileEncryptionInfo convert(
ezKeyVersionName);
}

public static List<Boolean> convert(boolean[] targetPinnings, int idx) {
List<Boolean> pinnings = new ArrayList<Boolean>();
if (targetPinnings == null) {
pinnings.add(Boolean.FALSE);
} else {
for (; idx < targetPinnings.length; ++idx) {
pinnings.add(Boolean.valueOf(targetPinnings[idx]));
}
}
return pinnings;
}

public static boolean[] convertBooleanList(
List<Boolean> targetPinningsList) {
final boolean[] targetPinnings = new boolean[targetPinningsList.size()];
for (int i = 0; i < targetPinningsList.size(); i++) {
targetPinnings[i] = targetPinningsList.get(i);
}
return targetPinnings;
}

}
Expand Up @@ -132,6 +132,8 @@ class BlockReceiver implements Closeable {
private long lastResponseTime = 0;
private boolean isReplaceBlock = false;
private DataOutputStream replyOut = null;

private boolean pinning;

BlockReceiver(final ExtendedBlock block, final StorageType storageType,
final DataInputStream in,
Expand All @@ -141,7 +143,8 @@ class BlockReceiver implements Closeable {
final String clientname, final DatanodeInfo srcDataNode,
final DataNode datanode, DataChecksum requestedChecksum,
CachingStrategy cachingStrategy,
final boolean allowLazyPersist) throws IOException {
final boolean allowLazyPersist,
final boolean pinning) throws IOException {
try{
this.block = block;
this.in = in;
Expand All @@ -165,12 +168,14 @@ class BlockReceiver implements Closeable {
this.isTransfer = stage == BlockConstructionStage.TRANSFER_RBW
|| stage == BlockConstructionStage.TRANSFER_FINALIZED;

this.pinning = pinning;
if (LOG.isDebugEnabled()) {
LOG.debug(getClass().getSimpleName() + ": " + block
+ "\n isClient =" + isClient + ", clientname=" + clientname
+ "\n isDatanode=" + isDatanode + ", srcDataNode=" + srcDataNode
+ "\n inAddr=" + inAddr + ", myAddr=" + myAddr
+ "\n cachingStrategy = " + cachingStrategy
+ "\n pinning=" + pinning
);
}

Expand Down Expand Up @@ -1279,6 +1284,11 @@ private void finalizeBlock(long startTime) throws IOException {
: 0;
block.setNumBytes(replicaInfo.getNumBytes());
datanode.data.finalizeBlock(block);

if (pinning) {
datanode.data.setPinning(block);
}

datanode.closeBlock(
block, DataNode.EMPTY_DEL_HINT, replicaInfo.getStorageUuid());
if (ClientTraceLog.isInfoEnabled() && isClient) {
Expand Down
Expand Up @@ -2068,7 +2068,7 @@ public void run() {
new Sender(out).writeBlock(b, targetStorageTypes[0], accessToken,
clientname, targets, targetStorageTypes, srcNode,
stage, 0, 0, 0, 0, blockSender.getChecksum(), cachingStrategy,
false);
false, false, null);

// send data & checksum
blockSender.sendBlock(out, unbufOut, null);
Expand Down
Expand Up @@ -581,7 +581,9 @@ public void writeBlock(final ExtendedBlock block,
final long latestGenerationStamp,
DataChecksum requestedChecksum,
CachingStrategy cachingStrategy,
final boolean allowLazyPersist) throws IOException {
final boolean allowLazyPersist,
final boolean pinning,
final boolean[] targetPinnings) throws IOException {
previousOpClientName = clientname;
updateCurrentThreadName("Receiving block " + block);
final boolean isDatanode = clientname.length() == 0;
Expand All @@ -594,14 +596,14 @@ public void writeBlock(final ExtendedBlock block,
throw new IOException(stage + " does not support multiple targets "
+ Arrays.asList(targets));
}

if (LOG.isDebugEnabled()) {
LOG.debug("opWriteBlock: stage=" + stage + ", clientname=" + clientname
+ "\n block =" + block + ", newGs=" + latestGenerationStamp
+ ", bytesRcvd=[" + minBytesRcvd + ", " + maxBytesRcvd + "]"
+ "\n targets=" + Arrays.asList(targets)
+ "; pipelineSize=" + pipelineSize + ", srcDataNode=" + srcDataNode
);
+ ", pinning=" + pinning);
LOG.debug("isDatanode=" + isDatanode
+ ", isClient=" + isClient
+ ", isTransfer=" + isTransfer);
Expand Down Expand Up @@ -643,7 +645,7 @@ public void writeBlock(final ExtendedBlock block,
peer.getLocalAddressString(),
stage, latestGenerationStamp, minBytesRcvd, maxBytesRcvd,
clientname, srcDataNode, datanode, requestedChecksum,
cachingStrategy, allowLazyPersist);
cachingStrategy, allowLazyPersist, pinning);

storageUuid = blockReceiver.getStorageUuid();
} else {
Expand Down Expand Up @@ -686,10 +688,19 @@ public void writeBlock(final ExtendedBlock block,
mirrorIn = new DataInputStream(unbufMirrorIn);

// Do not propagate allowLazyPersist to downstream DataNodes.
new Sender(mirrorOut).writeBlock(originalBlock, targetStorageTypes[0],
if (targetPinnings != null && targetPinnings.length > 0) {
new Sender(mirrorOut).writeBlock(originalBlock, targetStorageTypes[0],
blockToken, clientname, targets, targetStorageTypes, srcDataNode,
stage, pipelineSize, minBytesRcvd, maxBytesRcvd,
latestGenerationStamp, requestedChecksum, cachingStrategy, false);
latestGenerationStamp, requestedChecksum, cachingStrategy,
false, targetPinnings[0], targetPinnings);
} else {
new Sender(mirrorOut).writeBlock(originalBlock, targetStorageTypes[0],
blockToken, clientname, targets, targetStorageTypes, srcDataNode,
stage, pipelineSize, minBytesRcvd, maxBytesRcvd,
latestGenerationStamp, requestedChecksum, cachingStrategy,
false, false, targetPinnings);
}

mirrorOut.flush();

Expand Down Expand Up @@ -949,7 +960,14 @@ public void copyBlock(final ExtendedBlock block,
}

}


if (datanode.data.getPinning(block)) {
String msg = "Not able to copy block " + block.getBlockId() + " " +
"to " + peer.getRemoteAddressString() + " because it's pinned ";
LOG.info(msg);
sendResponse(ERROR, msg);
}

if (!dataXceiverServer.balanceThrottler.acquire()) { // not able to start
String msg = "Not able to copy block " + block.getBlockId() + " " +
"to " + peer.getRemoteAddressString() + " because threads " +
Expand Down Expand Up @@ -1109,7 +1127,7 @@ public void replaceBlock(final ExtendedBlock block,
proxyReply, proxySock.getRemoteSocketAddress().toString(),
proxySock.getLocalSocketAddress().toString(),
null, 0, 0, 0, "", null, datanode, remoteChecksum,
CachingStrategy.newDropBehind(), false);
CachingStrategy.newDropBehind(), false, false);

// receive a block
blockReceiver.receiveBlock(null, null, replyOut, null,
Expand Down
Expand Up @@ -522,4 +522,17 @@ public void onCompleteLazyPersist(String bpId, long blockId,
*/
public ReplicaInfo moveBlockAcrossStorage(final ExtendedBlock block,
StorageType targetStorageType) throws IOException;

/**
* Set a block to be pinned on this datanode so that it cannot be moved
* by Balancer/Mover.
*
* It is a no-op when dfs.datanode.block-pinning.enabled is set to false.
*/
public void setPinning(ExtendedBlock block) throws IOException;

/**
* Check whether the block was pinned
*/
public boolean getPinning(ExtendedBlock block) throws IOException;
}
Expand Up @@ -50,6 +50,10 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.ExtendedBlockId;
Expand Down Expand Up @@ -239,6 +243,10 @@ public LengthInputStream getMetaDataInputStream(ExtendedBlock b)
// Used for synchronizing access to usage stats
private final Object statsLock = new Object();

final LocalFileSystem localFS;

private boolean blockPinningEnabled;

/**
* An FSDataset has a directory where it loads its data files.
*/
Expand Down Expand Up @@ -299,6 +307,10 @@ public LengthInputStream getMetaDataInputStream(ExtendedBlock b)
lazyWriter = new Daemon(new LazyWriter(conf));
lazyWriter.start();
registerMBean(datanode.getDatanodeUuid());
localFS = FileSystem.getLocal(conf);
blockPinningEnabled = conf.getBoolean(
DFSConfigKeys.DFS_DATANODE_BLOCK_PINNING_ENABLED,
DFSConfigKeys.DFS_DATANODE_BLOCK_PINNING_ENABLED_DEFAULT);
}

private void addVolume(Collection<StorageLocation> dataLocations,
Expand Down Expand Up @@ -2842,5 +2854,33 @@ public void stop() {
shouldRun = false;
}
}

@Override
public void setPinning(ExtendedBlock block) throws IOException {
if (!blockPinningEnabled) {
return;
}

File f = getBlockFile(block);
Path p = new Path(f.getAbsolutePath());

FsPermission oldPermission = localFS.getFileStatus(
new Path(f.getAbsolutePath())).getPermission();
//sticky bit is used for pinning purpose
FsPermission permission = new FsPermission(oldPermission.getUserAction(),
oldPermission.getGroupAction(), oldPermission.getOtherAction(), true);
localFS.setPermission(p, permission);
}

@Override
public boolean getPinning(ExtendedBlock block) throws IOException {
if (!blockPinningEnabled) {
return false;
}
File f = getBlockFile(block);

FileStatus fss = localFS.getFileStatus(new Path(f.getAbsolutePath()));
return fss.getPermission().getStickyBit();
}
}

Expand Up @@ -123,6 +123,9 @@ message OpWriteBlockProto {
* to ignore this hint.
*/
optional bool allowLazyPersist = 13 [default = false];
//whether to pin the block, so Balancer won't move it.
optional bool pinning = 14 [default = false];
repeated bool targetPinnings = 15;
}

message OpTransferBlockProto {
Expand Down
Expand Up @@ -2264,4 +2264,10 @@
</description>
</property>

<property>
<name>dfs.datanode.block-pinning.enabled</name>
<value>false</value>
<description>Whether pin blocks on favored DataNode.</description>
</property>

</configuration>

0 comments on commit 085b1e2

Please sign in to comment.