diff --git a/CHANGES.txt b/CHANGES.txt index 2edbbb2f..7c5dee5f 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -10,6 +10,8 @@ Append branch (unreleased changes) HDFS-636. SafeMode counts complete blocks only. (shv) + HDFS-644. Lease recovery, concurrency support. (shv) + NEW FEATURES HDFS-536. Support hflush at DFSClient. (hairong) diff --git a/src/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java b/src/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java index 5d7bf53a..3f90b206 100644 --- a/src/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java +++ b/src/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java @@ -42,6 +42,7 @@ public interface ClientDatanodeProtocol extends VersionedProtocol { * generated access token is returned as part of the return value. * @throws IOException */ + @Deprecated // not used anymore - should be removed LocatedBlock recoverBlock(Block block, boolean keepLength, DatanodeInfo[] targets) throws IOException; } diff --git a/src/java/org/apache/hadoop/hdfs/protocol/RecoveryInProgressException.java b/src/java/org/apache/hadoop/hdfs/protocol/RecoveryInProgressException.java index 11b5f3df..43e812c3 100644 --- a/src/java/org/apache/hadoop/hdfs/protocol/RecoveryInProgressException.java +++ b/src/java/org/apache/hadoop/hdfs/protocol/RecoveryInProgressException.java @@ -15,17 +15,17 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.hadoop.hdfs.server.datanode; +package org.apache.hadoop.hdfs.protocol; import java.io.IOException; /** * Exception indicating that a replica is already being recovery. */ -class RecoveryInProgressException extends IOException { +public class RecoveryInProgressException extends IOException { private static final long serialVersionUID = 1L; - RecoveryInProgressException(String msg) { + public RecoveryInProgressException(String msg) { super(msg); } } \ No newline at end of file diff --git a/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index 899819c1..3c38f642 100644 --- a/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -34,6 +34,7 @@ import java.util.AbstractList; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.EnumSet; import java.util.HashMap; import java.util.LinkedList; @@ -69,6 +70,7 @@ import org.apache.hadoop.hdfs.server.namenode.StreamFile; import org.apache.hadoop.hdfs.server.protocol.BlockCommand; import org.apache.hadoop.hdfs.server.protocol.BlockMetaDataInfo; +import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; @@ -77,6 +79,7 @@ import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand; +import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock; import org.apache.hadoop.http.HttpServer; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.ipc.RPC; @@ -913,7 +916,7 @@ private boolean processCommand(DatanodeCommand cmd) throws IOException { processDistributedUpgradeCommand((UpgradeCommand)cmd); break; case DatanodeProtocol.DNA_RECOVERBLOCK: - recoverBlocks(bcmd.getBlocks(), bcmd.getTargets()); + recoverBlocks(((BlockRecoveryCommand)cmd).getRecoveringBlocks()); break; case DatanodeProtocol.DNA_ACCESSKEYUPDATE: LOG.info("DatanodeCommand action: DNA_ACCESSKEYUPDATE"); @@ -1515,16 +1518,16 @@ public BlockMetaDataInfo getBlockMetaDataInfo(Block block return info; } - public Daemon recoverBlocks(final Block[] blocks, final DatanodeInfo[][] targets) { + public Daemon recoverBlocks(final Collection blocks) { Daemon d = new Daemon(threadGroup, new Runnable() { /** Recover a list of blocks. It is run by the primary datanode. */ public void run() { - for(int i = 0; i < blocks.length; i++) { + for(RecoveringBlock b : blocks) { try { - logRecoverBlock("NameNode", blocks[i], targets[i]); - recoverBlock(blocks[i], false, targets[i], true); + logRecoverBlock("NameNode", b.getBlock(), b.getLocations()); + recoverBlock(b); } catch (IOException e) { - LOG.warn("recoverBlocks FAILED, blocks[" + i + "]=" + blocks[i], e); + LOG.warn("recoverBlocks FAILED: " + b, e); } } } @@ -1580,9 +1583,9 @@ public String toString() { } /** Recover a block */ - private LocatedBlock recoverBlock(Block block, boolean keepLength, - DatanodeInfo[] targets, boolean closeFile) throws IOException { - + private LocatedBlock recoverBlock(RecoveringBlock rBlock) throws IOException { + Block block = rBlock.getBlock(); + DatanodeInfo[] targets = rBlock.getLocations(); DatanodeID[] datanodeids = (DatanodeID[])targets; // If the block is already being recovered, then skip recovering it. // This can happen if the namenode and client start recovering the same @@ -1609,16 +1612,9 @@ private LocatedBlock recoverBlock(Block block, boolean keepLength, this: DataNode.createInterDataNodeProtocolProxy(id, getConf()); BlockMetaDataInfo info = datanode.getBlockMetaDataInfo(block); if (info != null && info.getGenerationStamp() >= block.getGenerationStamp()) { - if (keepLength) { - if (info.getNumBytes() == block.getNumBytes()) { - syncList.add(new BlockRecord(id, datanode, new Block(info))); - } - } - else { - syncList.add(new BlockRecord(id, datanode, new Block(info))); - if (info.getNumBytes() < minlength) { - minlength = info.getNumBytes(); - } + syncList.add(new BlockRecord(id, datanode, new Block(info))); + if (info.getNumBytes() < minlength) { + minlength = info.getNumBytes(); } } } catch (IOException e) { @@ -1633,10 +1629,8 @@ private LocatedBlock recoverBlock(Block block, boolean keepLength, throw new IOException("All datanodes failed: block=" + block + ", datanodeids=" + Arrays.asList(datanodeids)); } - if (!keepLength) { - block.setNumBytes(minlength); - } - return syncBlock(block, syncList, targets, closeFile); + block.setNumBytes(minlength); + return syncBlock(rBlock, syncList); } finally { synchronized (ongoingRecovery) { ongoingRecovery.remove(block); @@ -1645,20 +1639,22 @@ private LocatedBlock recoverBlock(Block block, boolean keepLength, } /** Block synchronization */ - private LocatedBlock syncBlock(Block block, List syncList, - DatanodeInfo[] targets, boolean closeFile) throws IOException { + private LocatedBlock syncBlock(RecoveringBlock rBlock, + List syncList) throws IOException { + Block block = rBlock.getBlock(); + long newGenerationStamp = rBlock.getNewGenerationStamp(); if (LOG.isDebugEnabled()) { LOG.debug("block=" + block + ", (length=" + block.getNumBytes() - + "), syncList=" + syncList + ", closeFile=" + closeFile); + + "), syncList=" + syncList); } //syncList.isEmpty() that all datanodes do not have the block //so the block can be deleted. if (syncList.isEmpty()) { - namenode.commitBlockSynchronization(block, 0, 0, closeFile, true, - DatanodeID.EMPTY_ARRAY); + namenode.commitBlockSynchronization(block, newGenerationStamp, 0, + true, true, DatanodeID.EMPTY_ARRAY); //always return a new access token even if everything else stays the same - LocatedBlock b = new LocatedBlock(block, targets); + LocatedBlock b = new LocatedBlock(block, rBlock.getLocations()); if (isAccessTokenEnabled) { b.setAccessToken(accessTokenHandler.generateToken(null, b.getBlock() .getBlockId(), EnumSet.of(AccessTokenHandler.AccessMode.WRITE))); @@ -1668,12 +1664,12 @@ private LocatedBlock syncBlock(Block block, List syncList, List successList = new ArrayList(); - long generationstamp = namenode.nextGenerationStamp(block); - Block newblock = new Block(block.getBlockId(), block.getNumBytes(), generationstamp); + Block newblock = + new Block(block.getBlockId(), block.getNumBytes(), newGenerationStamp); for(BlockRecord r : syncList) { try { - r.datanode.updateBlock(r.block, newblock, closeFile); + r.datanode.updateBlock(r.block, newblock, true); successList.add(r.id); } catch (IOException e) { InterDatanodeProtocol.LOG.warn("Failed to updateBlock (newblock=" @@ -1685,7 +1681,7 @@ private LocatedBlock syncBlock(Block block, List syncList, DatanodeID[] nlist = successList.toArray(new DatanodeID[successList.size()]); namenode.commitBlockSynchronization(block, - newblock.getGenerationStamp(), newblock.getNumBytes(), closeFile, false, + newblock.getGenerationStamp(), newblock.getNumBytes(), true, false, nlist); DatanodeInfo[] info = new DatanodeInfo[nlist.length]; for (int i = 0; i < nlist.length; i++) { @@ -1712,10 +1708,12 @@ private LocatedBlock syncBlock(Block block, List syncList, // ClientDataNodeProtocol implementation /** {@inheritDoc} */ + @SuppressWarnings("deprecation") public LocatedBlock recoverBlock(Block block, boolean keepLength, DatanodeInfo[] targets ) throws IOException { logRecoverBlock("Client", block, targets); - return recoverBlock(block, keepLength, targets, false); + assert false : "ClientDatanodeProtocol.recoverBlock: should never be called."; + return null; } private static void logRecoverBlock(String who, diff --git a/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java b/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java index 080d61f5..8709a8e8 100644 --- a/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java +++ b/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java @@ -45,6 +45,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.FSConstants; import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException; import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean; import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol; import org.apache.hadoop.metrics.util.MBeanUtil; @@ -1414,7 +1415,9 @@ synchronized File createTmpFile( FSVolume vol, Block blk ) throws IOException { public synchronized void finalizeBlock(Block b) throws IOException { ReplicaInfo replicaInfo = getReplicaInfo(b); if (replicaInfo.getState() == ReplicaState.FINALIZED) { - throw new IOException("Block " + b + " is already finalized."); + // this is legal, when recovery happens on a file that has + // been opened for append but never modified + return; } ReplicaInfo newReplicaInfo = null; if (replicaInfo.getState() == ReplicaState.RUR && diff --git a/src/java/org/apache/hadoop/hdfs/server/namenode/BlockInfo.java b/src/java/org/apache/hadoop/hdfs/server/namenode/BlockInfo.java index 6e8ec755..220c5d98 100644 --- a/src/java/org/apache/hadoop/hdfs/server/namenode/BlockInfo.java +++ b/src/java/org/apache/hadoop/hdfs/server/namenode/BlockInfo.java @@ -306,7 +306,7 @@ BlockInfoUnderConstruction convertToBlockUnderConstruction( // the block is already under construction BlockInfoUnderConstruction ucBlock = (BlockInfoUnderConstruction)this; ucBlock.setBlockUCState(s); - ucBlock.setLocations(targets); + ucBlock.setExpectedLocations(targets); ucBlock.setLastRecoveryTime(0); return ucBlock; } diff --git a/src/java/org/apache/hadoop/hdfs/server/namenode/BlockInfoUnderConstruction.java b/src/java/org/apache/hadoop/hdfs/server/namenode/BlockInfoUnderConstruction.java index 1f5f5f54..8c0b460f 100644 --- a/src/java/org/apache/hadoop/hdfs/server/namenode/BlockInfoUnderConstruction.java +++ b/src/java/org/apache/hadoop/hdfs/server/namenode/BlockInfoUnderConstruction.java @@ -45,6 +45,13 @@ class BlockInfoUnderConstruction extends BlockInfo { /** The last time the block was recovered. */ private long lastRecoveryTime = 0; + /** + * The new generation stamp, which this block will have + * after the recovery succeeds. Also used as a recovery id to identify + * the right recovery if any of the abandoned recoveries re-appear. + */ + private long blockRecoveryId = 0; + /** * ReplicaUnderConstruction contains information about replicas while * they are under construction. @@ -123,7 +130,7 @@ public boolean equals(Object obj) { assert getBlockUCState() != BlockUCState.COMPLETE : "BlockInfoUnderConstruction cannot be in COMPLETE state"; this.blockUCState = state; - setLocations(targets); + setExpectedLocations(targets); } /** @@ -144,7 +151,7 @@ assert getBlockUCState() != BlockUCState.COMPLETE : return new BlockInfo(this); } - void setLocations(DatanodeDescriptor[] targets) { + void setExpectedLocations(DatanodeDescriptor[] targets) { int numLocations = targets == null ? 0 : targets.length; this.replicas = new ArrayList(numLocations); for(int i = 0; i < numLocations; i++) @@ -156,7 +163,7 @@ void setLocations(DatanodeDescriptor[] targets) { * Create array of expected replica locations * (as has been assigned by chooseTargets()). */ - private DatanodeDescriptor[] getExpectedLocations() { + DatanodeDescriptor[] getExpectedLocations() { int numLocations = replicas == null ? 0 : replicas.size(); DatanodeDescriptor[] locations = new DatanodeDescriptor[numLocations]; for(int i = 0; i < numLocations; i++) @@ -164,7 +171,7 @@ private DatanodeDescriptor[] getExpectedLocations() { return locations; } - int getNumLocations() { + int getNumExpectedLocations() { return replicas == null ? 0 : replicas.size(); } @@ -181,6 +188,10 @@ void setBlockUCState(BlockUCState s) { blockUCState = s; } + long getBlockRecoveryId() { + return blockRecoveryId; + } + /** * Commit block's length and generation stamp as reported by the client. * Set block state to {@link BlockUCState#COMMITTED}. @@ -197,9 +208,12 @@ void commitBlock(Block block) throws IOException { /** * Initialize lease recovery for this block. - * Find the first alive data-node starting from the previous primary. + * Find the first alive data-node starting from the previous primary and + * make it primary. */ - void assignPrimaryDatanode() { + void initializeBlockRecovery(long recoveryId) { + setBlockUCState(BlockUCState.UNDER_RECOVERY); + blockRecoveryId = recoveryId; if (replicas.size() == 0) { NameNode.stateChangeLog.warn("BLOCK*" + " INodeFileUnderConstruction.initLeaseRecovery:" @@ -212,7 +226,7 @@ void assignPrimaryDatanode() { if (replicas.get(j).isAlive()) { primaryNodeIndex = j; DatanodeDescriptor primary = replicas.get(j).getExpectedLocation(); - primary.addBlockToBeRecovered(this, getExpectedLocations()); + primary.addBlockToBeRecovered(this); NameNode.stateChangeLog.info("BLOCK* " + this + " recovery started, primary=" + primary); return; diff --git a/src/java/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java b/src/java/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java index c21e30f4..4a60235f 100644 --- a/src/java/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java +++ b/src/java/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java @@ -28,6 +28,8 @@ import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportIterator; import org.apache.hadoop.hdfs.server.common.HdfsConstants.ReplicaState; import org.apache.hadoop.hdfs.server.protocol.BlockCommand; +import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand; +import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock; import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; import org.apache.hadoop.io.Text; import org.apache.hadoop.hdfs.DeprecatedUTF8; @@ -57,29 +59,36 @@ public static class BlockTargetPair { } /** A BlockTargetPair queue. */ - private static class BlockQueue { - private final Queue blockq = new LinkedList(); + private static class BlockQueue { + private final Queue blockq = new LinkedList(); /** Size of the queue */ synchronized int size() {return blockq.size();} /** Enqueue */ - synchronized boolean offer(Block block, DatanodeDescriptor[] targets) { - return blockq.offer(new BlockTargetPair(block, targets)); + synchronized boolean offer(E e) { + return blockq.offer(e); } /** Dequeue */ - synchronized List poll(int numBlocks) { + synchronized List poll(int numBlocks) { if (numBlocks <= 0 || blockq.isEmpty()) { return null; } - List results = new ArrayList(); + List results = new ArrayList(); for(; !blockq.isEmpty() && numBlocks > 0; numBlocks--) { results.add(blockq.poll()); } return results; } + + /** + * Returns true if the queue contains the specified element. + */ + boolean contains(E e) { + return blockq.contains(e); + } } private volatile BlockInfo blockList = null; @@ -89,9 +98,10 @@ synchronized List poll(int numBlocks) { protected boolean needKeyUpdate = false; /** A queue of blocks to be replicated by this datanode */ - private BlockQueue replicateBlocks = new BlockQueue(); + private BlockQueue replicateBlocks = new BlockQueue(); /** A queue of blocks to be recovered by this datanode */ - private BlockQueue recoverBlocks = new BlockQueue(); + private BlockQueue recoverBlocks = + new BlockQueue(); /** A set of blocks to be invalidated by this datanode */ private Set invalidateBlocks = new TreeSet(); @@ -279,15 +289,20 @@ Iterator getBlockIterator() { */ void addBlockToBeReplicated(Block block, DatanodeDescriptor[] targets) { assert(block != null && targets != null && targets.length > 0); - replicateBlocks.offer(block, targets); + replicateBlocks.offer(new BlockTargetPair(block, targets)); } /** * Store block recovery work. */ - void addBlockToBeRecovered(Block block, DatanodeDescriptor[] targets) { - assert(block != null && targets != null && targets.length > 0); - recoverBlocks.offer(block, targets); + void addBlockToBeRecovered(BlockInfoUnderConstruction block) { + if(recoverBlocks.contains(block)) { + // this prevents adding the same block twice to the recovery queue + FSNamesystem.LOG.info("Block " + block + + " is already in the recovery queue."); + return; + } + recoverBlocks.offer(block); } /** @@ -325,10 +340,16 @@ BlockCommand getReplicationCommand(int maxTransfers) { new BlockCommand(DatanodeProtocol.DNA_TRANSFER, blocktargetlist); } - BlockCommand getLeaseRecoveryCommand(int maxTransfers) { - List blocktargetlist = recoverBlocks.poll(maxTransfers); - return blocktargetlist == null? null: - new BlockCommand(DatanodeProtocol.DNA_RECOVERBLOCK, blocktargetlist); + BlockRecoveryCommand getLeaseRecoveryCommand(int maxTransfers) { + List blocks = recoverBlocks.poll(maxTransfers); + if(blocks == null) + return null; + BlockRecoveryCommand brCommand = new BlockRecoveryCommand(blocks.size()); + for(BlockInfoUnderConstruction b : blocks) { + brCommand.add(new RecoveringBlock( + b, b.getExpectedLocations(), b.getBlockRecoveryId())); + } + return brCommand; } /** @@ -444,7 +465,7 @@ BlockInfo processReportedBlock( Collection toCorrupt) {// add to corrupt replicas FSNamesystem.LOG.debug("Reported block " + block + " on " + getName() + " size " + block.getNumBytes() - + "replicaState = " + rState); + + " replicaState = " + rState); // find block by blockId BlockInfo storedBlock = blockManager.findStoredBlock(block.getBlockId()); diff --git a/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java b/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java index 06df33ed..963abcf6 100644 --- a/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java +++ b/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java @@ -528,7 +528,7 @@ int loadEditRecords(int logVersion, DataInputStream in, clientMachine, null); fsDir.replaceNode(path, node, cons); - fsNamesys.leaseManager.addLease(cons.clientName, path); + fsNamesys.leaseManager.addLease(cons.getClientName(), path); } break; } diff --git a/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java b/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java index efcf903a..d1841f5c 100644 --- a/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java +++ b/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java @@ -1404,7 +1404,7 @@ private void loadFilesUnderConstruction(int version, DataInputStream in, } INodeFile oldnode = (INodeFile) old; fsDir.replaceNode(path, oldnode, cons); - fs.leaseManager.addLease(cons.clientName, path); + fs.leaseManager.addLease(cons.getClientName(), path); } } diff --git a/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index d11981fd..39150531 100644 --- a/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -918,40 +918,45 @@ private synchronized void startFileInternal(String src, // If the file is under construction , then it must be in our // leases. Find the appropriate lease record. // - Lease lease = leaseManager.getLease(holder); - // - // We found the lease for this file. And surprisingly the original - // holder is trying to recreate this file. This should never occur. - // - if (lease != null) { + Lease lease = leaseManager.getLeaseByPath(src); + if (lease == null) { throw new AlreadyBeingCreatedException( - "failed to create file " + src + " for " + holder + - " on client " + clientMachine + - " because current leaseholder is trying to recreate file."); + "failed to create file " + src + " for " + holder + + " on client " + clientMachine + + " because pendingCreates is non-null but no leases found."); } // - // Find the original holder. + // We found the lease for this file. And surprisingly the original + // holder is trying to recreate this file. This should never occur. // - lease = leaseManager.getLease(pendingFile.clientName); - if (lease == null) { + if (lease.getHolder().equals(holder)) { throw new AlreadyBeingCreatedException( - "failed to create file " + src + " for " + holder + - " on client " + clientMachine + - " because pendingCreates is non-null but no leases found."); + "failed to create file " + src + " for " + holder + + " on client " + clientMachine + + " because current leaseholder is trying to recreate file."); } + assert lease.getHolder().equals(pendingFile.getClientName()) : + "Current lease holder " + lease.getHolder() + + " does not match file creator " + pendingFile.getClientName(); // + // Current lease holder is different from the requester. // If the original holder has not renewed in the last SOFTLIMIT - // period, then start lease recovery. + // period, then start lease recovery, otherwise fail. // if (lease.expiredSoftLimit()) { LOG.info("startFile: recover lease " + lease + ", src=" + src); - internalReleaseLease(lease, src); - } - throw new AlreadyBeingCreatedException("failed to create file " + src + " for " + holder + - " on client " + clientMachine + - ", because this file is already being created by " + - pendingFile.getClientName() + - " on " + pendingFile.getClientMachine()); + boolean isClosed = internalReleaseLease(lease, src, null); + if(!isClosed) + throw new RecoveryInProgressException( + "Failed to close file " + src + + ". Lease recovery is in progress. Try again later."); + + } else + throw new AlreadyBeingCreatedException("failed to create file " + + src + " for " + holder + " on client " + clientMachine + + ", because this file is already being created by " + + pendingFile.getClientName() + + " on " + pendingFile.getClientMachine()); } try { @@ -1004,7 +1009,7 @@ private synchronized void startFileInternal(String src, clientMachine, clientNode); dir.replaceNode(src, node, cons); - leaseManager.addLease(cons.clientName, src); + leaseManager.addLease(cons.getClientName(), src); } else { // Now we can add the name to the filesystem. This file has no @@ -1020,7 +1025,7 @@ private synchronized void startFileInternal(String src, throw new IOException("DIR* NameSystem.startFile: " + "Unable to add file to namespace."); } - leaseManager.addLease(newNode.clientName, src); + leaseManager.addLease(newNode.getClientName(), src); if (NameNode.stateChangeLog.isDebugEnabled()) { NameNode.stateChangeLog.debug("DIR* NameSystem.startFile: " +"add "+src+" to namespace for "+holder); @@ -1632,20 +1637,31 @@ void fsync(String src, String clientName) throws IOException { * Move a file that is being written to be immutable. * @param src The filename * @param lease The lease for the client creating the file + * @param recoveryLeaseHolder reassign lease to this holder if the last block + * needs recovery; keep current holder if null. + * @throws AlreadyBeingCreatedException if file is waiting to achieve minimal + * replication;
+ * RecoveryInProgressException if lease recovery is in progress.
+ * IOException in case of an error. + * @return true if file has been successfully finalized and closed or + * false if block recovery has been initiated */ - void internalReleaseLease(Lease lease, String src) throws IOException { + boolean internalReleaseLease( + Lease lease, String src, String recoveryLeaseHolder) + throws AlreadyBeingCreatedException, + IOException { LOG.info("Recovering lease=" + lease + ", src=" + src); INodeFile iFile = dir.getFileINode(src); if (iFile == null) { - final String message = "DIR* NameSystem.internalReleaseCreate: " + final String message = "DIR* NameSystem.internalReleaseLease: " + "attempt to release a create lock on " + src + " file does not exist."; NameNode.stateChangeLog.warn(message); throw new IOException(message); } if (!iFile.isUnderConstruction()) { - final String message = "DIR* NameSystem.internalReleaseCreate: " + final String message = "DIR* NameSystem.internalReleaseLease: " + "attempt to release a create lock on " + src + " but file is already closed."; NameNode.stateChangeLog.warn(message); @@ -1653,35 +1669,112 @@ void internalReleaseLease(Lease lease, String src) throws IOException { } INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction) iFile; - BlockInfoUnderConstruction lastBlock = pendingFile.getLastBlock(); + int nrBlocks = pendingFile.numBlocks(); + BlockInfo[] blocks = pendingFile.getBlocks(); + + int nrCompleteBlocks; + BlockInfo curBlock = null; + for(nrCompleteBlocks = 0; nrCompleteBlocks < nrBlocks; nrCompleteBlocks++) { + curBlock = blocks[nrCompleteBlocks]; + if(!curBlock.isComplete()) + break; + assert blockManager.checkMinReplication(curBlock) : + "A COMPLETE block is not minimally replicated in " + src; + } - // Initialize lease recovery for pendingFile. If there are no blocks - // associated with this file, then reap lease immediately. Otherwise - // renew the lease and trigger lease recovery. - if (lastBlock == null) { - assert pendingFile.getBlocks().length == 0 : - "file is not empty but the last block does not exist"; + // If there are no incomplete blocks associated with this file, + // then reap lease immediately and close the file. + if(nrCompleteBlocks == nrBlocks) { finalizeINodeFileUnderConstruction(src, pendingFile); NameNode.stateChangeLog.warn("BLOCK*" - + " internalReleaseLease: No blocks found, lease removed."); - return; + + " internalReleaseLease: All existing blocks are COMPLETE," + + " lease removed, file closed."); + return true; // closed! } - // setup the last block locations from the blockManager if not known - if(lastBlock.getNumLocations() == 0) { - DatanodeDescriptor targets[] = blockManager.getNodes(lastBlock); - lastBlock.setLocations(targets); + // Only the last and the penultimate blocks may be in non COMPLETE state. + // If the penultimate block is not COMPLETE, then it must be COMMITTED. + if(nrCompleteBlocks < nrBlocks - 2 || + nrCompleteBlocks == nrBlocks - 2 && + curBlock.getBlockUCState() != BlockUCState.COMMITTED) { + final String message = "DIR* NameSystem.internalReleaseLease: " + + "attempt to release a create lock on " + + src + " but file is already closed."; + NameNode.stateChangeLog.warn(message); + throw new IOException(message); + } + + // no we know that the last block is not COMPLETE, and + // that the penultimate block if exists is either COMPLETE or COMMITTED + BlockInfoUnderConstruction lastBlock = pendingFile.getLastBlock(); + BlockUCState lastBlockState = lastBlock.getBlockUCState(); + BlockInfo penultimateBlock = pendingFile.getPenultimateBlock(); + BlockUCState penultimateBlockState = (penultimateBlock == null ? + BlockUCState.COMPLETE : penultimateBlock.getBlockUCState()); + assert penultimateBlockState == BlockUCState.COMPLETE || + penultimateBlockState == BlockUCState.COMMITTED : + "Unexpected state of penultimate block in " + src; + + switch(lastBlockState) { + case COMPLETE: + assert false : "Already checked that the last block is incomplete"; + break; + case COMMITTED: + // Close file if committed blocks are minimally replicated + if(blockManager.checkMinReplication(penultimateBlock) && + blockManager.checkMinReplication(lastBlock)) { + finalizeINodeFileUnderConstruction(src, pendingFile); + NameNode.stateChangeLog.warn("BLOCK*" + + " internalReleaseLease: Committed blocks are minimally replicated," + + " lease removed, file closed."); + return true; // closed! + } + // Cannot close file right now, since some blocks + // are not yet minimally replicated. + // This may potentially cause infinite loop in lease recovery + // if there are no valid replicas on data-nodes. + String message = "DIR* NameSystem.internalReleaseLease: " + + "Failed to release lease for file " + src + + ". Committed blocks are waiting to be minimally replicated." + + " Try again later."; + NameNode.stateChangeLog.warn(message); + throw new AlreadyBeingCreatedException(message); + case UNDER_CONSTRUCTION: + case UNDER_RECOVERY: + // setup the last block locations from the blockManager if not known + if(lastBlock.getNumExpectedLocations() == 0) + lastBlock.setExpectedLocations(blockManager.getNodes(lastBlock)); + // start recovery of the last block for this file + long blockRecoveryId = nextGenerationStamp(); + lease = reassignLease(lease, src, recoveryLeaseHolder, pendingFile); + lastBlock.initializeBlockRecovery(blockRecoveryId); + leaseManager.renewLease(lease); + // Cannot close file right now, since the last block requires recovery. + // This may potentially cause infinite loop in lease recovery + // if there are no valid replicas on data-nodes. + NameNode.stateChangeLog.warn( + "DIR* NameSystem.internalReleaseLease: " + + "File " + src + " has not been closed." + + " Lease recovery is in progress. " + + "RecoveryId = " + blockRecoveryId + " for block " + lastBlock); + break; } + return false; + } - // start lease recovery of the last block for this file. - lastBlock.assignPrimaryDatanode(); - leaseManager.renewLease(lease); + Lease reassignLease(Lease lease, String src, String newHolder, + INodeFileUnderConstruction pendingFile) { + if(newHolder == null) + return lease; + pendingFile.setClientName(newHolder); + return leaseManager.reassignLease(lease, src, newHolder); } + private void finalizeINodeFileUnderConstruction( String src, INodeFileUnderConstruction pendingFile) throws IOException { - leaseManager.removeLease(pendingFile.clientName, src); + leaseManager.removeLease(pendingFile.getClientName(), src); // complete the penultimate block blockManager.completeBlock(pendingFile, pendingFile.numBlocks()-2); @@ -1715,11 +1808,20 @@ synchronized void commitBlockSynchronization(Block lastblock, throw new IOException("Block (=" + lastblock + ") not found"); } INodeFile iFile = oldblockinfo.getINode(); - if (!iFile.isUnderConstruction()) { + if (!iFile.isUnderConstruction() || oldblockinfo.isComplete()) { throw new IOException("Unexpected block (=" + lastblock + ") since the file (=" + iFile.getLocalName() + ") is not under construction"); } + + long recoveryId = + ((BlockInfoUnderConstruction)oldblockinfo).getBlockRecoveryId(); + if(recoveryId != newgenerationstamp) { + throw new IOException("The recovery id " + newgenerationstamp + + " does not match current recovery id " + + recoveryId + " for block " + lastblock); + } + INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction)iFile; diff --git a/src/java/org/apache/hadoop/hdfs/server/namenode/INodeFileUnderConstruction.java b/src/java/org/apache/hadoop/hdfs/server/namenode/INodeFileUnderConstruction.java index 3099ede7..f50353ed 100644 --- a/src/java/org/apache/hadoop/hdfs/server/namenode/INodeFileUnderConstruction.java +++ b/src/java/org/apache/hadoop/hdfs/server/namenode/INodeFileUnderConstruction.java @@ -25,7 +25,7 @@ class INodeFileUnderConstruction extends INodeFile { - final String clientName; // lease holder + private String clientName; // lease holder private final String clientMachine; private final DatanodeDescriptor clientNode; // if client is a cluster node too. @@ -64,6 +64,10 @@ String getClientName() { return clientName; } + void setClientName(String clientName) { + this.clientName = clientName; + } + String getClientMachine() { return clientMachine; } diff --git a/src/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java b/src/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java index 41ce50fd..069543be 100644 --- a/src/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java +++ b/src/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java @@ -102,7 +102,7 @@ synchronized int countPath() { /** * Adds (or re-adds) the lease for the specified file. */ - synchronized void addLease(String holder, String src) { + synchronized Lease addLease(String holder, String src) { Lease lease = getLease(holder); if (lease == null) { lease = new Lease(holder); @@ -113,6 +113,7 @@ synchronized void addLease(String holder, String src) { } sortedLeasesByPath.put(src, lease); lease.paths.add(src); + return lease; } /** @@ -142,12 +143,23 @@ synchronized void removeLease(String holder, String src) { } } + /** + * Reassign lease for file src to the new holder. + */ + synchronized Lease reassignLease(Lease lease, String src, String newHolder) { + assert newHolder != null : "new lease holder is null"; + if (lease != null) { + removeLease(lease, src); + } + return addLease(newHolder, src); + } + /** * Finds the pathname for the specified pendingFile */ synchronized String findPath(INodeFileUnderConstruction pendingFile ) throws IOException { - Lease lease = getLease(pendingFile.clientName); + Lease lease = getLease(pendingFile.getClientName()); if (lease != null) { String src = lease.findPath(pendingFile); if (src != null) { @@ -265,7 +277,11 @@ public int hashCode() { Collection getPaths() { return paths; } - + + String getHolder() { + return holder; + } + void replacePath(String oldpath, String newpath) { paths.remove(oldpath); paths.add(newpath); @@ -376,7 +392,13 @@ private synchronized void checkLeases() { oldest.getPaths().toArray(leasePaths); for(String p : leasePaths) { try { - fsnamesystem.internalReleaseLease(oldest, p); + if(fsnamesystem.internalReleaseLease(oldest, p, "HDFS_NameNode")) { + LOG.info("Lease recovery for file " + p + + " is complete. File closed."); + removing.add(p); + } else + LOG.info("Started block recovery for file " + p + + " lease " + oldest); } catch (IOException e) { LOG.error("Cannot release the path "+p+" in the lease "+oldest, e); removing.add(p); diff --git a/src/java/org/apache/hadoop/hdfs/server/protocol/BlockRecoveryCommand.java b/src/java/org/apache/hadoop/hdfs/server/protocol/BlockRecoveryCommand.java new file mode 100644 index 00000000..2913d64e --- /dev/null +++ b/src/java/org/apache/hadoop/hdfs/server/protocol/BlockRecoveryCommand.java @@ -0,0 +1,161 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.protocol; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.Collection; +import java.util.ArrayList; + +import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableFactories; +import org.apache.hadoop.io.WritableFactory; + +/** + * BlockRecoveryCommand is an instruction to a data-node to recover + * the specified blocks. + * + * The data-node that receives this command treats itself as a primary + * data-node in the recover process. + * + * Block recovery is identified by a recoveryId, which is also the new + * generation stamp, which the block will have after the recovery succeeds. + */ +public class BlockRecoveryCommand extends DatanodeCommand { + Collection recoveringBlocks; + + /** + * This is a block with locations from which it should be recovered + * and the new generation stamp, which the block will have after + * successful recovery. + * + * The new generation stamp of the block, also plays role of the recovery id. + */ + public static class RecoveringBlock extends LocatedBlock { + private long newGenerationStamp; + + /** + * Create empty RecoveringBlock. + */ + public RecoveringBlock() { + super(); + newGenerationStamp = -1L; + } + + /** + * Create RecoveringBlock. + */ + public RecoveringBlock(Block b, DatanodeInfo[] locs, long newGS) { + super(b, locs, -1, false); // startOffset is unknown + this.newGenerationStamp = newGS; + } + + /** + * Return the new generation stamp of the block, + * which also plays role of the recovery id. + */ + public long getNewGenerationStamp() { + return newGenerationStamp; + } + + /////////////////////////////////////////// + // Writable + /////////////////////////////////////////// + static { // register a ctor + WritableFactories.setFactory + (RecoveringBlock.class, + new WritableFactory() { + public Writable newInstance() { return new RecoveringBlock(); } + }); + } + + public void write(DataOutput out) throws IOException { + super.write(out); + out.writeLong(newGenerationStamp); + } + + public void readFields(DataInput in) throws IOException { + super.readFields(in); + newGenerationStamp = in.readLong(); + } + } + + /** + * Create empty BlockRecoveryCommand. + */ + public BlockRecoveryCommand() { + this(0); + } + + /** + * Create BlockRecoveryCommand with + * the specified capacity for recovering blocks. + */ + public BlockRecoveryCommand(int capacity) { + super(DatanodeProtocol.DNA_RECOVERBLOCK); + recoveringBlocks = new ArrayList(capacity); + } + + /** + * Return the list of recovering blocks. + */ + public Collection getRecoveringBlocks() { + return recoveringBlocks; + } + + /** + * Add recovering block to the command. + */ + public void add(RecoveringBlock block) { + recoveringBlocks.add(block); + } + + /////////////////////////////////////////// + // Writable + /////////////////////////////////////////// + static { // register a ctor + WritableFactories.setFactory + (BlockRecoveryCommand.class, + new WritableFactory() { + public Writable newInstance() { return new BlockRecoveryCommand(); } + }); + } + + public void write(DataOutput out) throws IOException { + super.write(out); + out.writeInt(recoveringBlocks.size()); + for(RecoveringBlock block : recoveringBlocks) { + block.write(out); + } + } + + public void readFields(DataInput in) throws IOException { + super.readFields(in); + int numBlocks = in.readInt(); + recoveringBlocks = new ArrayList(numBlocks); + for(int i = 0; i < numBlocks; i++) { + RecoveringBlock b = new RecoveringBlock(); + b.readFields(in); + add(b); + } + } +} diff --git a/src/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java b/src/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java index 5f465fd1..96787817 100644 --- a/src/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java +++ b/src/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java @@ -35,9 +35,9 @@ **********************************************************************/ public interface DatanodeProtocol extends VersionedProtocol { /** - * 21: blockReport() includes under-construction replicas. + * 22: BlockRecoveryCommand introduced in reply to sendHeartbeat(). */ - public static final long versionID = 21L; + public static final long versionID = 22L; // error code final static int NOTIFY = 0; diff --git a/src/test/hdfs/org/apache/hadoop/hdfs/TestLeaseRecovery.java b/src/test/hdfs/org/apache/hadoop/hdfs/TestLeaseRecovery.java index 2023db94..212ef3c3 100644 --- a/src/test/hdfs/org/apache/hadoop/hdfs/TestLeaseRecovery.java +++ b/src/test/hdfs/org/apache/hadoop/hdfs/TestLeaseRecovery.java @@ -18,12 +18,10 @@ package org.apache.hadoop.hdfs; import java.io.IOException; -import java.util.Arrays; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.protocol.Block; -import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.server.datanode.DataNode; @@ -34,6 +32,7 @@ public class TestLeaseRecovery extends junit.framework.TestCase { static final int BLOCK_SIZE = 1024; static final short REPLICATION_NUM = (short)3; + private static final long LEASE_PERIOD = 300L; static void checkMetaInfo(Block b, InterDatanodeProtocol idp ) throws IOException { @@ -50,6 +49,15 @@ static int min(Integer... x) { return m; } + void waitLeaseRecovery(MiniDFSCluster cluster) { + cluster.setLeasePeriod(LEASE_PERIOD, LEASE_PERIOD); + // wait for the lease to expire + try { + Thread.sleep(2 * 3000); // 2 heartbeat intervals + } catch (InterruptedException e) { + } + } + /** * The following test first creates a file with a few blocks. * It randomly truncates the replica of the last block stored in each datanode. @@ -96,44 +104,22 @@ public void testBlockSynchronization() throws Exception { checkMetaInfo(lastblock, idps[i]); } - //setup random block sizes - int lastblocksize = ORG_FILE_SIZE % BLOCK_SIZE; - Integer[] newblocksizes = new Integer[REPLICATION_NUM]; - for(int i = 0; i < REPLICATION_NUM; i++) { - newblocksizes[i] = AppendTestUtil.nextInt(lastblocksize); - } - DataNode.LOG.info("newblocksizes = " + Arrays.asList(newblocksizes)); DataNode.LOG.info("dfs.dfs.clientName=" + dfs.dfs.clientName); cluster.getNameNode().append(filestr, dfs.dfs.clientName); - //update blocks with random block sizes - long newGS = cluster.getNameNode().nextGenerationStamp(lastblock); - Block[] newblocks = new Block[REPLICATION_NUM]; - for(int i = 0; i < REPLICATION_NUM; i++) { - newblocks[i] = new Block(lastblock.getBlockId(), newblocksizes[i], - newGS); - idps[i].updateBlock(lastblock, newblocks[i], false); - checkMetaInfo(newblocks[i], idps[i]); - } - cluster.getNameNode().commitBlockSynchronization(lastblock, newGS, - lastblocksize, false, false, new DatanodeID[]{}); - - //block synchronization - final int primarydatanodeindex = AppendTestUtil.nextInt(datanodes.length); - DataNode.LOG.info("primarydatanodeindex =" + primarydatanodeindex); - DataNode primary = datanodes[primarydatanodeindex]; - DataNode.LOG.info("primary.dnRegistration=" + primary.dnRegistration); - primary.recoverBlocks(new Block[]{lastblock}, new DatanodeInfo[][]{datanodeinfos}).join(); + // expire lease to trigger block recovery. + waitLeaseRecovery(cluster); BlockMetaDataInfo[] updatedmetainfo = new BlockMetaDataInfo[REPLICATION_NUM]; - int minsize = min(newblocksizes); - long currentGS = cluster.getNamesystem().getGenerationStamp(); - lastblock.setGenerationStamp(currentGS); + long oldSize = lastblock.getNumBytes(); + lastblock = TestInterDatanodeProtocol.getLastLocatedBlock( + dfs.dfs.getNamenode(), filestr).getBlock(); + long currentGS = lastblock.getGenerationStamp(); for(int i = 0; i < REPLICATION_NUM; i++) { updatedmetainfo[i] = idps[i].getBlockMetaDataInfo(lastblock); assertEquals(lastblock.getBlockId(), updatedmetainfo[i].getBlockId()); - assertEquals(minsize, updatedmetainfo[i].getNumBytes()); + assertEquals(oldSize, updatedmetainfo[i].getNumBytes()); assertEquals(currentGS, updatedmetainfo[i].getGenerationStamp()); } } diff --git a/src/test/hdfs/org/apache/hadoop/hdfs/TestLeaseRecovery2.java b/src/test/hdfs/org/apache/hadoop/hdfs/TestLeaseRecovery2.java index 3fd6cdfb..f512bd2d 100644 --- a/src/test/hdfs/org/apache/hadoop/hdfs/TestLeaseRecovery2.java +++ b/src/test/hdfs/org/apache/hadoop/hdfs/TestLeaseRecovery2.java @@ -56,6 +56,7 @@ public void testBlockSynchronization() throws Exception { // conf.setInt("io.bytes.per.checksum", 16); MiniDFSCluster cluster = null; + DistributedFileSystem dfs = null; byte[] actual = new byte[FILE_SIZE]; try { @@ -63,7 +64,7 @@ public void testBlockSynchronization() throws Exception { cluster.waitActive(); //create a file - DistributedFileSystem dfs = (DistributedFileSystem)cluster.getFileSystem(); + dfs = (DistributedFileSystem)cluster.getFileSystem(); // create a random file name String filestr = "/foo" + AppendTestUtil.nextInt(); System.out.println("filestr=" + filestr); @@ -129,10 +130,9 @@ else if (message.contains(AlreadyBeingCreatedException.class.getSimpleName())) { + "Validating its contents now..."); // verify that file-size matches + long fileSize = dfs.getFileStatus(filepath).getLen(); assertTrue("File should be " + size + " bytes, but is actually " + - " found to be " + dfs.getFileStatus(filepath).getLen() + - " bytes", - dfs.getFileStatus(filepath).getLen() == size); + " found to be " + fileSize + " bytes", fileSize == size); // verify that there is enough data to read. System.out.println("File size is good. Now validating sizes from datanodes..."); @@ -142,6 +142,7 @@ else if (message.contains(AlreadyBeingCreatedException.class.getSimpleName())) { } finally { try { + if(dfs != null) dfs.close(); if (cluster != null) {cluster.shutdown();} } catch (Exception e) { // ignore diff --git a/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestInterDatanodeProtocol.java b/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestInterDatanodeProtocol.java index 2f73f618..b48380ef 100644 --- a/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestInterDatanodeProtocol.java +++ b/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestInterDatanodeProtocol.java @@ -30,6 +30,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; +import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException; import org.apache.hadoop.hdfs.server.datanode.ReplicaUnderRecovery.Info; import org.apache.hadoop.hdfs.server.protocol.BlockMetaDataInfo; import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;