Skip to content
This repository has been archived by the owner on Feb 9, 2021. It is now read-only.

Commit

Permalink
HDFS-644. Lease recovery, concurrency support. Contributed by Konstan…
Browse files Browse the repository at this point in the history
…tin Shvachko.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/hdfs/branches/HDFS-265@819215 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information
shvachko committed Sep 26, 2009
1 parent f944000 commit d819628
Show file tree
Hide file tree
Showing 18 changed files with 469 additions and 153 deletions.
2 changes: 2 additions & 0 deletions CHANGES.txt
Expand Up @@ -10,6 +10,8 @@ Append branch (unreleased changes)

HDFS-636. SafeMode counts complete blocks only. (shv)

HDFS-644. Lease recovery, concurrency support. (shv)

NEW FEATURES

HDFS-536. Support hflush at DFSClient. (hairong)
Expand Down
Expand Up @@ -42,6 +42,7 @@ public interface ClientDatanodeProtocol extends VersionedProtocol {
* generated access token is returned as part of the return value.
* @throws IOException
*/
@Deprecated // not used anymore - should be removed
LocatedBlock recoverBlock(Block block, boolean keepLength,
DatanodeInfo[] targets) throws IOException;
}
Expand Up @@ -15,17 +15,17 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode;
package org.apache.hadoop.hdfs.protocol;

import java.io.IOException;

/**
* Exception indicating that a replica is already being recovery.
*/
class RecoveryInProgressException extends IOException {
public class RecoveryInProgressException extends IOException {
private static final long serialVersionUID = 1L;

RecoveryInProgressException(String msg) {
public RecoveryInProgressException(String msg) {
super(msg);
}
}
66 changes: 32 additions & 34 deletions src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
Expand Up @@ -34,6 +34,7 @@
import java.util.AbstractList;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.LinkedList;
Expand Down Expand Up @@ -69,6 +70,7 @@
import org.apache.hadoop.hdfs.server.namenode.StreamFile;
import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockMetaDataInfo;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
Expand All @@ -77,6 +79,7 @@
import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
import org.apache.hadoop.http.HttpServer;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ipc.RPC;
Expand Down Expand Up @@ -913,7 +916,7 @@ private boolean processCommand(DatanodeCommand cmd) throws IOException {
processDistributedUpgradeCommand((UpgradeCommand)cmd);
break;
case DatanodeProtocol.DNA_RECOVERBLOCK:
recoverBlocks(bcmd.getBlocks(), bcmd.getTargets());
recoverBlocks(((BlockRecoveryCommand)cmd).getRecoveringBlocks());
break;
case DatanodeProtocol.DNA_ACCESSKEYUPDATE:
LOG.info("DatanodeCommand action: DNA_ACCESSKEYUPDATE");
Expand Down Expand Up @@ -1515,16 +1518,16 @@ public BlockMetaDataInfo getBlockMetaDataInfo(Block block
return info;
}

public Daemon recoverBlocks(final Block[] blocks, final DatanodeInfo[][] targets) {
public Daemon recoverBlocks(final Collection<RecoveringBlock> blocks) {
Daemon d = new Daemon(threadGroup, new Runnable() {
/** Recover a list of blocks. It is run by the primary datanode. */
public void run() {
for(int i = 0; i < blocks.length; i++) {
for(RecoveringBlock b : blocks) {
try {
logRecoverBlock("NameNode", blocks[i], targets[i]);
recoverBlock(blocks[i], false, targets[i], true);
logRecoverBlock("NameNode", b.getBlock(), b.getLocations());
recoverBlock(b);
} catch (IOException e) {
LOG.warn("recoverBlocks FAILED, blocks[" + i + "]=" + blocks[i], e);
LOG.warn("recoverBlocks FAILED: " + b, e);
}
}
}
Expand Down Expand Up @@ -1580,9 +1583,9 @@ public String toString() {
}

/** Recover a block */
private LocatedBlock recoverBlock(Block block, boolean keepLength,
DatanodeInfo[] targets, boolean closeFile) throws IOException {

private LocatedBlock recoverBlock(RecoveringBlock rBlock) throws IOException {
Block block = rBlock.getBlock();
DatanodeInfo[] targets = rBlock.getLocations();
DatanodeID[] datanodeids = (DatanodeID[])targets;
// If the block is already being recovered, then skip recovering it.
// This can happen if the namenode and client start recovering the same
Expand All @@ -1609,16 +1612,9 @@ private LocatedBlock recoverBlock(Block block, boolean keepLength,
this: DataNode.createInterDataNodeProtocolProxy(id, getConf());
BlockMetaDataInfo info = datanode.getBlockMetaDataInfo(block);
if (info != null && info.getGenerationStamp() >= block.getGenerationStamp()) {
if (keepLength) {
if (info.getNumBytes() == block.getNumBytes()) {
syncList.add(new BlockRecord(id, datanode, new Block(info)));
}
}
else {
syncList.add(new BlockRecord(id, datanode, new Block(info)));
if (info.getNumBytes() < minlength) {
minlength = info.getNumBytes();
}
syncList.add(new BlockRecord(id, datanode, new Block(info)));
if (info.getNumBytes() < minlength) {
minlength = info.getNumBytes();
}
}
} catch (IOException e) {
Expand All @@ -1633,10 +1629,8 @@ private LocatedBlock recoverBlock(Block block, boolean keepLength,
throw new IOException("All datanodes failed: block=" + block
+ ", datanodeids=" + Arrays.asList(datanodeids));
}
if (!keepLength) {
block.setNumBytes(minlength);
}
return syncBlock(block, syncList, targets, closeFile);
block.setNumBytes(minlength);
return syncBlock(rBlock, syncList);
} finally {
synchronized (ongoingRecovery) {
ongoingRecovery.remove(block);
Expand All @@ -1645,20 +1639,22 @@ private LocatedBlock recoverBlock(Block block, boolean keepLength,
}

/** Block synchronization */
private LocatedBlock syncBlock(Block block, List<BlockRecord> syncList,
DatanodeInfo[] targets, boolean closeFile) throws IOException {
private LocatedBlock syncBlock(RecoveringBlock rBlock,
List<BlockRecord> syncList) throws IOException {
Block block = rBlock.getBlock();
long newGenerationStamp = rBlock.getNewGenerationStamp();
if (LOG.isDebugEnabled()) {
LOG.debug("block=" + block + ", (length=" + block.getNumBytes()
+ "), syncList=" + syncList + ", closeFile=" + closeFile);
+ "), syncList=" + syncList);
}

//syncList.isEmpty() that all datanodes do not have the block
//so the block can be deleted.
if (syncList.isEmpty()) {
namenode.commitBlockSynchronization(block, 0, 0, closeFile, true,
DatanodeID.EMPTY_ARRAY);
namenode.commitBlockSynchronization(block, newGenerationStamp, 0,
true, true, DatanodeID.EMPTY_ARRAY);
//always return a new access token even if everything else stays the same
LocatedBlock b = new LocatedBlock(block, targets);
LocatedBlock b = new LocatedBlock(block, rBlock.getLocations());
if (isAccessTokenEnabled) {
b.setAccessToken(accessTokenHandler.generateToken(null, b.getBlock()
.getBlockId(), EnumSet.of(AccessTokenHandler.AccessMode.WRITE)));
Expand All @@ -1668,12 +1664,12 @@ private LocatedBlock syncBlock(Block block, List<BlockRecord> syncList,

List<DatanodeID> successList = new ArrayList<DatanodeID>();

long generationstamp = namenode.nextGenerationStamp(block);
Block newblock = new Block(block.getBlockId(), block.getNumBytes(), generationstamp);
Block newblock =
new Block(block.getBlockId(), block.getNumBytes(), newGenerationStamp);

for(BlockRecord r : syncList) {
try {
r.datanode.updateBlock(r.block, newblock, closeFile);
r.datanode.updateBlock(r.block, newblock, true);
successList.add(r.id);
} catch (IOException e) {
InterDatanodeProtocol.LOG.warn("Failed to updateBlock (newblock="
Expand All @@ -1685,7 +1681,7 @@ private LocatedBlock syncBlock(Block block, List<BlockRecord> syncList,
DatanodeID[] nlist = successList.toArray(new DatanodeID[successList.size()]);

namenode.commitBlockSynchronization(block,
newblock.getGenerationStamp(), newblock.getNumBytes(), closeFile, false,
newblock.getGenerationStamp(), newblock.getNumBytes(), true, false,
nlist);
DatanodeInfo[] info = new DatanodeInfo[nlist.length];
for (int i = 0; i < nlist.length; i++) {
Expand All @@ -1712,10 +1708,12 @@ private LocatedBlock syncBlock(Block block, List<BlockRecord> syncList,

// ClientDataNodeProtocol implementation
/** {@inheritDoc} */
@SuppressWarnings("deprecation")
public LocatedBlock recoverBlock(Block block, boolean keepLength, DatanodeInfo[] targets
) throws IOException {
logRecoverBlock("Client", block, targets);
return recoverBlock(block, keepLength, targets, false);
assert false : "ClientDatanodeProtocol.recoverBlock: should never be called.";
return null;
}

private static void logRecoverBlock(String who,
Expand Down
Expand Up @@ -45,6 +45,7 @@
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.FSConstants;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
import org.apache.hadoop.metrics.util.MBeanUtil;
Expand Down Expand Up @@ -1414,7 +1415,9 @@ synchronized File createTmpFile( FSVolume vol, Block blk ) throws IOException {
public synchronized void finalizeBlock(Block b) throws IOException {
ReplicaInfo replicaInfo = getReplicaInfo(b);
if (replicaInfo.getState() == ReplicaState.FINALIZED) {
throw new IOException("Block " + b + " is already finalized.");
// this is legal, when recovery happens on a file that has
// been opened for append but never modified
return;
}
ReplicaInfo newReplicaInfo = null;
if (replicaInfo.getState() == ReplicaState.RUR &&
Expand Down
Expand Up @@ -306,7 +306,7 @@ BlockInfoUnderConstruction convertToBlockUnderConstruction(
// the block is already under construction
BlockInfoUnderConstruction ucBlock = (BlockInfoUnderConstruction)this;
ucBlock.setBlockUCState(s);
ucBlock.setLocations(targets);
ucBlock.setExpectedLocations(targets);
ucBlock.setLastRecoveryTime(0);
return ucBlock;
}
Expand Down
Expand Up @@ -45,6 +45,13 @@ class BlockInfoUnderConstruction extends BlockInfo {
/** The last time the block was recovered. */
private long lastRecoveryTime = 0;

/**
* The new generation stamp, which this block will have
* after the recovery succeeds. Also used as a recovery id to identify
* the right recovery if any of the abandoned recoveries re-appear.
*/
private long blockRecoveryId = 0;

/**
* ReplicaUnderConstruction contains information about replicas while
* they are under construction.
Expand Down Expand Up @@ -123,7 +130,7 @@ public boolean equals(Object obj) {
assert getBlockUCState() != BlockUCState.COMPLETE :
"BlockInfoUnderConstruction cannot be in COMPLETE state";
this.blockUCState = state;
setLocations(targets);
setExpectedLocations(targets);
}

/**
Expand All @@ -144,7 +151,7 @@ assert getBlockUCState() != BlockUCState.COMPLETE :
return new BlockInfo(this);
}

void setLocations(DatanodeDescriptor[] targets) {
void setExpectedLocations(DatanodeDescriptor[] targets) {
int numLocations = targets == null ? 0 : targets.length;
this.replicas = new ArrayList<ReplicaUnderConstruction>(numLocations);
for(int i = 0; i < numLocations; i++)
Expand All @@ -156,15 +163,15 @@ void setLocations(DatanodeDescriptor[] targets) {
* Create array of expected replica locations
* (as has been assigned by chooseTargets()).
*/
private DatanodeDescriptor[] getExpectedLocations() {
DatanodeDescriptor[] getExpectedLocations() {
int numLocations = replicas == null ? 0 : replicas.size();
DatanodeDescriptor[] locations = new DatanodeDescriptor[numLocations];
for(int i = 0; i < numLocations; i++)
locations[i] = replicas.get(i).getExpectedLocation();
return locations;
}

int getNumLocations() {
int getNumExpectedLocations() {
return replicas == null ? 0 : replicas.size();
}

Expand All @@ -181,6 +188,10 @@ void setBlockUCState(BlockUCState s) {
blockUCState = s;
}

long getBlockRecoveryId() {
return blockRecoveryId;
}

/**
* Commit block's length and generation stamp as reported by the client.
* Set block state to {@link BlockUCState#COMMITTED}.
Expand All @@ -197,9 +208,12 @@ void commitBlock(Block block) throws IOException {

/**
* Initialize lease recovery for this block.
* Find the first alive data-node starting from the previous primary.
* Find the first alive data-node starting from the previous primary and
* make it primary.
*/
void assignPrimaryDatanode() {
void initializeBlockRecovery(long recoveryId) {
setBlockUCState(BlockUCState.UNDER_RECOVERY);
blockRecoveryId = recoveryId;
if (replicas.size() == 0) {
NameNode.stateChangeLog.warn("BLOCK*"
+ " INodeFileUnderConstruction.initLeaseRecovery:"
Expand All @@ -212,7 +226,7 @@ void assignPrimaryDatanode() {
if (replicas.get(j).isAlive()) {
primaryNodeIndex = j;
DatanodeDescriptor primary = replicas.get(j).getExpectedLocation();
primary.addBlockToBeRecovered(this, getExpectedLocations());
primary.addBlockToBeRecovered(this);
NameNode.stateChangeLog.info("BLOCK* " + this
+ " recovery started, primary=" + primary);
return;
Expand Down

0 comments on commit d819628

Please sign in to comment.