Skip to content

Commit

Permalink
HDFS-7369. Erasure coding: distribute recovery work for striped block…
Browse files Browse the repository at this point in the history
…s to DataNode. Contributed by Zhe Zhang.
  • Loading branch information
zhe-thoughts authored and Zhe Zhang committed May 26, 2015
1 parent e3dbfeb commit 57a84c0
Show file tree
Hide file tree
Showing 11 changed files with 484 additions and 108 deletions.
Expand Up @@ -86,4 +86,9 @@ public void convertLastBlockToUC(BlockInfo lastBlock,
* @return whether the block collection is under construction.
*/
public boolean isUnderConstruction();

/**
* @return whether the block collection is in striping format
*/
public boolean isStriped();
}

Large diffs are not rendered by default.

Expand Up @@ -29,6 +29,7 @@
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.Arrays;

import com.google.common.annotations.VisibleForTesting;

Expand All @@ -41,6 +42,7 @@
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.namenode.CachedBlock;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
Expand Down Expand Up @@ -97,6 +99,33 @@ public static class BlockTargetPair {
}
}

/** Block and targets pair */
@InterfaceAudience.Private
@InterfaceStability.Evolving
public static class BlockECRecoveryInfo {
public final ExtendedBlock block;
public final DatanodeDescriptor[] sources;
public final DatanodeStorageInfo[] targets;
public final short[] missingBlockIndices;

BlockECRecoveryInfo(ExtendedBlock block, DatanodeDescriptor[] sources,
DatanodeStorageInfo[] targets, short[] missingBlockIndices) {
this.block = block;
this.sources = sources;
this.targets = targets;
this.missingBlockIndices = missingBlockIndices;
}

@Override
public String toString() {
return new StringBuilder().append("BlockECRecoveryInfo(\n ").
append("Recovering ").append(block).
append(" From: ").append(Arrays.asList(sources)).
append(" To: ").append(Arrays.asList(targets)).append(")\n").
toString();
}
}

/** A BlockTargetPair queue. */
private static class BlockQueue<E> {
private final Queue<E> blockq = new LinkedList<E>();
Expand Down Expand Up @@ -217,12 +246,17 @@ public CachedBlocksList getPendingUncached() {
private long bandwidth;

/** A queue of blocks to be replicated by this datanode */
private final BlockQueue<BlockTargetPair> replicateBlocks = new BlockQueue<BlockTargetPair>();
private final BlockQueue<BlockTargetPair> replicateBlocks =
new BlockQueue<>();
/** A queue of blocks to be erasure coded by this datanode */
private final BlockQueue<BlockECRecoveryInfo> erasurecodeBlocks =
new BlockQueue<>();
/** A queue of blocks to be recovered by this datanode */
private final BlockQueue<BlockInfoContiguousUnderConstruction> recoverBlocks =
new BlockQueue<BlockInfoContiguousUnderConstruction>();
private final BlockQueue<BlockInfoContiguousUnderConstruction>
recoverBlocks = new BlockQueue<>();
/** A set of blocks to be invalidated by this datanode */
private final LightWeightHashSet<Block> invalidateBlocks = new LightWeightHashSet<Block>();
private final LightWeightHashSet<Block> invalidateBlocks =
new LightWeightHashSet<>();

/* Variables for maintaining number of blocks scheduled to be written to
* this storage. This count is approximate and might be slightly bigger
Expand Down Expand Up @@ -375,6 +409,7 @@ public void clearBlockQueues() {
this.invalidateBlocks.clear();
this.recoverBlocks.clear();
this.replicateBlocks.clear();
this.erasurecodeBlocks.clear();
}
// pendingCached, cached, and pendingUncached are protected by the
// FSN lock.
Expand Down Expand Up @@ -596,6 +631,20 @@ void addBlockToBeReplicated(Block block, DatanodeStorageInfo[] targets) {
replicateBlocks.offer(new BlockTargetPair(block, targets));
}

/**
* Store block erasure coding work.
*/
void addBlockToBeErasureCoded(ExtendedBlock block, DatanodeDescriptor[] sources,
DatanodeStorageInfo[] targets, short[] missingBlockIndicies) {
assert(block != null && sources != null && sources.length > 0);
BlockECRecoveryInfo task = new BlockECRecoveryInfo(block, sources, targets,
missingBlockIndicies);
erasurecodeBlocks.offer(task);
BlockManager.LOG.debug("Adding block recovery task " + task +
"to " + getName() + ", current queue size is " +
erasurecodeBlocks.size());
}

/**
* Store block recovery work.
*/
Expand Down Expand Up @@ -627,6 +676,13 @@ int getNumberOfBlocksToBeReplicated() {
return PendingReplicationWithoutTargets + replicateBlocks.size();
}

/**
* The number of work items that are pending to be replicated
*/
int getNumberOfBlocksToBeErasureCoded() {
return erasurecodeBlocks.size();
}

/**
* The number of block invalidation items that are pending to
* be sent to the datanode
Expand All @@ -641,6 +697,10 @@ public List<BlockTargetPair> getReplicationCommand(int maxTransfers) {
return replicateBlocks.poll(maxTransfers);
}

public List<BlockECRecoveryInfo> getErasureCodeCommand(int maxTransfers) {
return erasurecodeBlocks.poll(maxTransfers);
}

public BlockInfoContiguousUnderConstruction[] getLeaseRecoveryCommand(int maxTransfers) {
List<BlockInfoContiguousUnderConstruction> blocks = recoverBlocks.poll(maxTransfers);
if(blocks == null)
Expand Down Expand Up @@ -841,6 +901,10 @@ public String dumpDatanode() {
if (repl > 0) {
sb.append(" ").append(repl).append(" blocks to be replicated;");
}
int ec = erasurecodeBlocks.size();
if(ec > 0) {
sb.append(" ").append(ec).append(" blocks to be erasure coded;");
}
int inval = invalidateBlocks.size();
if (inval > 0) {
sb.append(" ").append(inval).append(" blocks to be invalidated;");
Expand Down
Expand Up @@ -34,6 +34,7 @@
import org.apache.hadoop.hdfs.protocol.*;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.BlockTargetPair;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.BlockECRecoveryInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.CachedBlocksList;
import org.apache.hadoop.hdfs.server.namenode.CachedBlock;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
Expand Down Expand Up @@ -1349,7 +1350,7 @@ public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg,
VolumeFailureSummary volumeFailureSummary) throws IOException {
synchronized (heartbeatManager) {
synchronized (datanodeMap) {
DatanodeDescriptor nodeinfo = null;
DatanodeDescriptor nodeinfo;
try {
nodeinfo = getDatanode(nodeReg);
} catch(UnregisteredNodeException e) {
Expand Down Expand Up @@ -1387,10 +1388,10 @@ public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg,
final DatanodeStorageInfo[] storages = b.getExpectedStorageLocations();
// Skip stale nodes during recovery - not heart beated for some time (30s by default).
final List<DatanodeStorageInfo> recoveryLocations =
new ArrayList<DatanodeStorageInfo>(storages.length);
for (int i = 0; i < storages.length; i++) {
if (!storages[i].getDatanodeDescriptor().isStale(staleInterval)) {
recoveryLocations.add(storages[i]);
new ArrayList<>(storages.length);
for (DatanodeStorageInfo storage : storages) {
if (!storage.getDatanodeDescriptor().isStale(staleInterval)) {
recoveryLocations.add(storage);
}
}
// If we are performing a truncate recovery than set recovery fields
Expand Down Expand Up @@ -1429,14 +1430,21 @@ public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg,
return new DatanodeCommand[] { brCommand };
}

final List<DatanodeCommand> cmds = new ArrayList<DatanodeCommand>();
final List<DatanodeCommand> cmds = new ArrayList<>();
//check pending replication
List<BlockTargetPair> pendingList = nodeinfo.getReplicationCommand(
maxTransfers);
if (pendingList != null) {
cmds.add(new BlockCommand(DatanodeProtocol.DNA_TRANSFER, blockPoolId,
pendingList));
}
// checking pending erasure coding tasks
List<BlockECRecoveryInfo> pendingECList =
nodeinfo.getErasureCodeCommand(maxTransfers);
if (pendingECList != null) {
cmds.add(new BlockECRecoveryCommand(DatanodeProtocol.DNA_CODEC,
pendingECList));
}
//check block invalidation
Block[] blks = nodeinfo.getInvalidateBlocks(blockInvalidateLimit);
if (blks != null) {
Expand Down
Expand Up @@ -419,7 +419,8 @@ public short getPreferredBlockReplication() {
}
max = maxInSnapshot > max ? maxInSnapshot : max;
}
return max;
return isStriped()?
HdfsConstants.NUM_DATA_BLOCKS + HdfsConstants.NUM_PARITY_BLOCKS : max;
}

/** Set the replication factor of this file. */
Expand Down Expand Up @@ -1107,11 +1108,12 @@ boolean isBlockInLatestSnapshot(BlockInfoContiguous block) {
Arrays.asList(snapshotBlocks).contains(block);
}

@VisibleForTesting
/**
* @return true if the file is in the striping layout.
*/
// TODO: move erasure coding policy to file XAttr (HDFS-7337)
@VisibleForTesting
@Override
// TODO: move erasure coding policy to file XAttr
public boolean isStriped() {
return getStoragePolicyID() == HdfsConstants.EC_STORAGE_POLICY_ID;
}
Expand Down
@@ -0,0 +1,63 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.protocol;

import com.google.common.base.Joiner;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.BlockECRecoveryInfo;

import java.util.Collection;

/**
* A BlockECRecoveryCommand is an instruction to a DataNode to reconstruct a
* striped block group with missing blocks.
*
* Upon receiving this command, the DataNode pulls data from other DataNodes
* hosting blocks in this group and reconstructs the lost blocks through codec
* calculation.
*
* After the reconstruction, the DataNode pushes the reconstructed blocks to
* their final destinations if necessary (e.g., the destination is different
* from the reconstruction node, or multiple blocks in a group are to be
* reconstructed).
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class BlockECRecoveryCommand extends DatanodeCommand {
final Collection<BlockECRecoveryInfo> ecTasks;

/**
* Create BlockECRecoveryCommand from a collection of
* {@link BlockECRecoveryInfo}, each representing a recovery task
*/
public BlockECRecoveryCommand(int action,
Collection<BlockECRecoveryInfo> blockECRecoveryInfoList) {
super(action);
this.ecTasks = blockECRecoveryInfoList;
}

@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("BlockECRecoveryCommand(\n ");
Joiner.on("\n ").appendTo(sb, ecTasks);
sb.append("\n)");
return sb.toString();
}
}
Expand Up @@ -76,6 +76,7 @@ public interface DatanodeProtocol {
final static int DNA_BALANCERBANDWIDTHUPDATE = 8; // update balancer bandwidth
final static int DNA_CACHE = 9; // cache blocks
final static int DNA_UNCACHE = 10; // uncache blocks
final static int DNA_CODEC = 11; // uncache blocks

/**
* Register Datanode.
Expand Down
Expand Up @@ -161,7 +161,7 @@ public static int computeInvalidationWork(BlockManager bm) {
*/
public static int computeAllPendingWork(BlockManager bm) {
int work = computeInvalidationWork(bm);
work += bm.computeReplicationWork(Integer.MAX_VALUE);
work += bm.computeBlockRecoveryWork(Integer.MAX_VALUE);
return work;
}

Expand Down
Expand Up @@ -453,8 +453,8 @@ private DatanodeStorageInfo[] scheduleSingleReplication(BlockInfo block) {
assertEquals("Block not initially pending replication", 0,
bm.pendingReplications.getNumReplicas(block));
assertEquals(
"computeReplicationWork should indicate replication is needed", 1,
bm.computeReplicationWorkForBlocks(list_all));
"computeBlockRecoveryWork should indicate replication is needed", 1,
bm.computeRecoveryWorkForBlocks(list_all));
assertTrue("replication is pending after work is computed",
bm.pendingReplications.getNumReplicas(block) > 0);

Expand Down Expand Up @@ -508,35 +508,35 @@ public void testHighestPriReplSrcChosenDespiteMaxReplLimit() throws Exception {
assertNotNull("Chooses source node for a highest-priority replication"
+ " even if all available source nodes have reached their replication"
+ " limits below the hard limit.",
bm.chooseSourceDatanode(
aBlock,
bm.chooseSourceDatanodes(
bm.getStoredBlock(aBlock),
cntNodes,
liveNodes,
new NumberReplicas(),
UnderReplicatedBlocks.QUEUE_HIGHEST_PRIORITY));
new LinkedList<Short>(), 1, UnderReplicatedBlocks.QUEUE_HIGHEST_PRIORITY)[0]);

assertNull("Does not choose a source node for a less-than-highest-priority"
+ " replication since all available source nodes have reached"
+ " their replication limits.",
bm.chooseSourceDatanode(
aBlock,
bm.chooseSourceDatanodes(
bm.getStoredBlock(aBlock),
cntNodes,
liveNodes,
new NumberReplicas(),
UnderReplicatedBlocks.QUEUE_VERY_UNDER_REPLICATED));
new LinkedList<Short>(), 1, UnderReplicatedBlocks.QUEUE_VERY_UNDER_REPLICATED)[0]);

// Increase the replication count to test replication count > hard limit
DatanodeStorageInfo targets[] = { origNodes.get(1).getStorageInfos()[0] };
origNodes.get(0).addBlockToBeReplicated(aBlock, targets);

assertNull("Does not choose a source node for a highest-priority"
+ " replication when all available nodes exceed the hard limit.",
bm.chooseSourceDatanode(
aBlock,
bm.chooseSourceDatanodes(
bm.getStoredBlock(aBlock),
cntNodes,
liveNodes,
new NumberReplicas(),
UnderReplicatedBlocks.QUEUE_HIGHEST_PRIORITY));
new LinkedList<Short>(), 1, UnderReplicatedBlocks.QUEUE_HIGHEST_PRIORITY)[0]);
}

@Test
Expand Down

0 comments on commit 57a84c0

Please sign in to comment.