From a16bfff71bd7f00e06e1f59bfe5445a154bb8c66 Mon Sep 17 00:00:00 2001 From: Jing Zhao Date: Tue, 24 Mar 2015 11:06:13 -0700 Subject: [PATCH] HDFS-7854. Separate class DataStreamer out of DFSOutputStream. Contributed by Li Bo. --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 + .../dev-support/findbugsExcludeFile.xml | 2 +- .../apache/hadoop/hdfs/DFSOutputStream.java | 1694 ++-------------- .../org/apache/hadoop/hdfs/DataStreamer.java | 1754 +++++++++++++++++ .../org/apache/hadoop/hdfs/DFSTestUtil.java | 2 +- .../hadoop/hdfs/TestDFSOutputStream.java | 5 +- .../apache/hadoop/hdfs/TestFileCreation.java | 18 +- 7 files changed, 1893 insertions(+), 1585 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 5dae029f5a272..4ec0891808ba8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -332,6 +332,9 @@ Release 2.8.0 - UNRELEASED HDFS-7829. Code clean up for LocatedBlock. (Takanobu Asanuma via jing9) + HDFS-7854. Separate class DataStreamer out of DFSOutputStream. (Li Bo via + jing9) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml b/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml index dedeeced0ccb9..224d2fb5defda 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml @@ -77,7 +77,7 @@ ResponseProccessor is thread that is designed to catch RuntimeException. --> - + diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java index d7d59af784072..ee3e6f6fe86a8 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java @@ -17,29 +17,12 @@ */ package org.apache.hadoop.hdfs; -import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.SUCCESS; - -import java.io.BufferedOutputStream; -import java.io.DataInputStream; -import java.io.DataOutputStream; import java.io.FileNotFoundException; import java.io.IOException; -import java.io.InputStream; import java.io.InterruptedIOException; -import java.io.OutputStream; -import java.net.InetAddress; -import java.net.InetSocketAddress; import java.net.Socket; import java.nio.channels.ClosedChannelException; -import java.util.ArrayList; -import java.util.Arrays; import java.util.EnumSet; -import java.util.HashSet; -import java.util.LinkedList; -import java.util.List; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import org.apache.hadoop.HadoopIllegalArgumentException; @@ -52,64 +35,37 @@ import org.apache.hadoop.fs.FileEncryptionInfo; import org.apache.hadoop.fs.ParentNotDirectoryException; import org.apache.hadoop.fs.permission.FsPermission; -import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.fs.Syncable; import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; import org.apache.hadoop.hdfs.client.HdfsDataOutputStream.SyncFlag; -import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; -import org.apache.hadoop.hdfs.protocol.QuotaExceededException; import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; -import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException; import org.apache.hadoop.hdfs.protocol.SnapshotAccessControlException; import org.apache.hadoop.hdfs.protocol.UnresolvedPathException; -import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage; -import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol; -import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil; -import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair; -import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException; import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader; -import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck; -import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; -import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto; -import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; -import org.apache.hadoop.hdfs.protocolPB.PBHelper; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; -import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite; import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; -import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException; import org.apache.hadoop.hdfs.server.namenode.RetryStartFileException; import org.apache.hadoop.hdfs.server.namenode.SafeModeException; import org.apache.hadoop.hdfs.util.ByteArrayManager; import org.apache.hadoop.io.EnumSetWritable; -import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.ipc.RemoteException; -import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.util.Daemon; import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.DataChecksum.Type; import org.apache.hadoop.util.Progressable; import org.apache.hadoop.util.Time; -import org.apache.htrace.NullScope; import org.apache.htrace.Sampler; -import org.apache.htrace.Span; import org.apache.htrace.Trace; -import org.apache.htrace.TraceInfo; import org.apache.htrace.TraceScope; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.CacheLoader; -import com.google.common.cache.LoadingCache; -import com.google.common.cache.RemovalListener; -import com.google.common.cache.RemovalNotification; /**************************************************************** @@ -121,19 +77,11 @@ * is typically 512 bytes and has an associated checksum with it. * * When a client application fills up the currentPacket, it is - * enqueued into dataQueue. The DataStreamer thread picks up - * packets from the dataQueue, sends it to the first datanode in - * the pipeline and moves it from the dataQueue to the ackQueue. - * The ResponseProcessor receives acks from the datanodes. When an - * successful ack for a packet is received from all datanodes, the - * ResponseProcessor removes the corresponding packet from the - * ackQueue. + * enqueued into the dataQueue of DataStreamer. DataStreamer is a + * thread that picks up packets from the dataQueue and sends it to + * the first datanode in the pipeline. * - * In case of error, all outstanding packets and moved from - * ackQueue. A new pipeline is setup by eliminating the bad - * datanode from the original pipeline. The DataStreamer now - * starts sending packets from the dataQueue. -****************************************************************/ + ****************************************************************/ @InterfaceAudience.Private public class DFSOutputStream extends FSOutputSummer implements Syncable, CanSetDropBehind { @@ -148,45 +96,25 @@ public class DFSOutputStream extends FSOutputSummer CryptoProtocolVersion.supported(); private final DFSClient dfsClient; - private final long dfsclientSlowLogThresholdMs; private final ByteArrayManager byteArrayManager; - private Socket s; // closed is accessed by different threads under different locks. private volatile boolean closed = false; - private String src; + private final String src; private final long fileId; private final long blockSize; - /** Only for DataTransferProtocol.writeBlock(..) */ - private final DataChecksum checksum4WriteBlock; - private final int bytesPerChecksum; + private final int bytesPerChecksum; - // both dataQueue and ackQueue are protected by dataQueue lock - private final LinkedList dataQueue = new LinkedList(); - private final LinkedList ackQueue = new LinkedList(); private DFSPacket currentPacket = null; private DataStreamer streamer; - private long currentSeqno = 0; - private long lastQueuedSeqno = -1; - private long lastAckedSeqno = -1; - private long bytesCurBlock = 0; // bytes written in current block private int packetSize = 0; // write packet size, not including the header. private int chunksPerPacket = 0; - private final AtomicReference lastException = new AtomicReference(); - private long artificialSlowdown = 0; private long lastFlushOffset = 0; // offset when flush was invoked - //persist blocks on namenode - private final AtomicBoolean persistBlocks = new AtomicBoolean(false); - private volatile boolean appendChunk = false; // appending to existing partial block private long initialFileSize = 0; // at time of file open - private final Progressable progress; private final short blockReplication; // replication factor of file private boolean shouldSyncBlock = false; // force blocks to disk upon close private final AtomicReference cachingStrategy; - private boolean failPacket = false; private FileEncryptionInfo fileEncryptionInfo; - private static final BlockStoragePolicySuite blockStoragePolicySuite = - BlockStoragePolicySuite.createDefaultSuite(); /** Use {@link ByteArrayManager} to create buffer for non-heartbeat packets.*/ private DFSPacket createPacket(int packetSize, int chunksPerPkt, long offsetInBlock, @@ -207,1326 +135,10 @@ private DFSPacket createPacket(int packetSize, int chunksPerPkt, long offsetInBl getChecksumSize(), lastPacketInBlock); } - /** - * For heartbeat packets, create buffer directly by new byte[] - * since heartbeats should not be blocked. - */ - private DFSPacket createHeartbeatPacket() throws InterruptedIOException { - final byte[] buf = new byte[PacketHeader.PKT_MAX_HEADER_LEN]; - return new DFSPacket(buf, 0, 0, DFSPacket.HEART_BEAT_SEQNO, - getChecksumSize(), false); - } - - - // - // The DataStreamer class is responsible for sending data packets to the - // datanodes in the pipeline. It retrieves a new blockid and block locations - // from the namenode, and starts streaming packets to the pipeline of - // Datanodes. Every packet has a sequence number associated with - // it. When all the packets for a block are sent out and acks for each - // if them are received, the DataStreamer closes the current block. - // - class DataStreamer extends Daemon { - private volatile boolean streamerClosed = false; - private ExtendedBlock block; // its length is number of bytes acked - private Token accessToken; - private DataOutputStream blockStream; - private DataInputStream blockReplyStream; - private ResponseProcessor response = null; - private volatile DatanodeInfo[] nodes = null; // list of targets for current block - private volatile StorageType[] storageTypes = null; - private volatile String[] storageIDs = null; - private final LoadingCache excludedNodes = - CacheBuilder.newBuilder() - .expireAfterWrite( - dfsClient.getConf().excludedNodesCacheExpiry, - TimeUnit.MILLISECONDS) - .removalListener(new RemovalListener() { - @Override - public void onRemoval( - RemovalNotification notification) { - DFSClient.LOG.info("Removing node " + - notification.getKey() + " from the excluded nodes list"); - } - }) - .build(new CacheLoader() { - @Override - public DatanodeInfo load(DatanodeInfo key) throws Exception { - return key; - } - }); - private String[] favoredNodes; - volatile boolean hasError = false; - volatile int errorIndex = -1; - // Restarting node index - AtomicInteger restartingNodeIndex = new AtomicInteger(-1); - private long restartDeadline = 0; // Deadline of DN restart - private BlockConstructionStage stage; // block construction stage - private long bytesSent = 0; // number of bytes that've been sent - private final boolean isLazyPersistFile; - - /** Nodes have been used in the pipeline before and have failed. */ - private final List failed = new ArrayList(); - /** The last ack sequence number before pipeline failure. */ - private long lastAckedSeqnoBeforeFailure = -1; - private int pipelineRecoveryCount = 0; - /** Has the current block been hflushed? */ - private boolean isHflushed = false; - /** Append on an existing block? */ - private final boolean isAppend; - - private DataStreamer(HdfsFileStatus stat, ExtendedBlock block) { - isAppend = false; - isLazyPersistFile = isLazyPersist(stat); - this.block = block; - stage = BlockConstructionStage.PIPELINE_SETUP_CREATE; - } - - /** - * Construct a data streamer for appending to the last partial block - * @param lastBlock last block of the file to be appended - * @param stat status of the file to be appended - * @param bytesPerChecksum number of bytes per checksum - * @throws IOException if error occurs - */ - private DataStreamer(LocatedBlock lastBlock, HdfsFileStatus stat, - int bytesPerChecksum) throws IOException { - isAppend = true; - stage = BlockConstructionStage.PIPELINE_SETUP_APPEND; - block = lastBlock.getBlock(); - bytesSent = block.getNumBytes(); - accessToken = lastBlock.getBlockToken(); - isLazyPersistFile = isLazyPersist(stat); - long usedInLastBlock = stat.getLen() % blockSize; - int freeInLastBlock = (int)(blockSize - usedInLastBlock); - - // calculate the amount of free space in the pre-existing - // last crc chunk - int usedInCksum = (int)(stat.getLen() % bytesPerChecksum); - int freeInCksum = bytesPerChecksum - usedInCksum; - - // if there is space in the last block, then we have to - // append to that block - if (freeInLastBlock == blockSize) { - throw new IOException("The last block for file " + - src + " is full."); - } - - if (usedInCksum > 0 && freeInCksum > 0) { - // if there is space in the last partial chunk, then - // setup in such a way that the next packet will have only - // one chunk that fills up the partial chunk. - // - computePacketChunkSize(0, freeInCksum); - setChecksumBufSize(freeInCksum); - appendChunk = true; - } else { - // if the remaining space in the block is smaller than - // that expected size of of a packet, then create - // smaller size packet. - // - computePacketChunkSize(Math.min(dfsClient.getConf().writePacketSize, freeInLastBlock), - bytesPerChecksum); - } - - // setup pipeline to append to the last block XXX retries?? - setPipeline(lastBlock); - errorIndex = -1; // no errors yet. - if (nodes.length < 1) { - throw new IOException("Unable to retrieve blocks locations " + - " for last block " + block + - "of file " + src); - - } - } - - private void setPipeline(LocatedBlock lb) { - setPipeline(lb.getLocations(), lb.getStorageTypes(), lb.getStorageIDs()); - } - private void setPipeline(DatanodeInfo[] nodes, StorageType[] storageTypes, - String[] storageIDs) { - this.nodes = nodes; - this.storageTypes = storageTypes; - this.storageIDs = storageIDs; - } - - private void setFavoredNodes(String[] favoredNodes) { - this.favoredNodes = favoredNodes; - } - - /** - * Initialize for data streaming - */ - private void initDataStreaming() { - this.setName("DataStreamer for file " + src + - " block " + block); - response = new ResponseProcessor(nodes); - response.start(); - stage = BlockConstructionStage.DATA_STREAMING; - } - - private void endBlock() { - if(DFSClient.LOG.isDebugEnabled()) { - DFSClient.LOG.debug("Closing old block " + block); - } - this.setName("DataStreamer for file " + src); - closeResponder(); - closeStream(); - setPipeline(null, null, null); - stage = BlockConstructionStage.PIPELINE_SETUP_CREATE; - } - - /* - * streamer thread is the only thread that opens streams to datanode, - * and closes them. Any error recovery is also done by this thread. - */ - @Override - public void run() { - long lastPacket = Time.monotonicNow(); - TraceScope scope = NullScope.INSTANCE; - while (!streamerClosed && dfsClient.clientRunning) { - // if the Responder encountered an error, shutdown Responder - if (hasError && response != null) { - try { - response.close(); - response.join(); - response = null; - } catch (InterruptedException e) { - DFSClient.LOG.warn("Caught exception ", e); - } - } - - DFSPacket one; - try { - // process datanode IO errors if any - boolean doSleep = false; - if (hasError && (errorIndex >= 0 || restartingNodeIndex.get() >= 0)) { - doSleep = processDatanodeError(); - } - - synchronized (dataQueue) { - // wait for a packet to be sent. - long now = Time.monotonicNow(); - while ((!streamerClosed && !hasError && dfsClient.clientRunning - && dataQueue.size() == 0 && - (stage != BlockConstructionStage.DATA_STREAMING || - stage == BlockConstructionStage.DATA_STREAMING && - now - lastPacket < dfsClient.getConf().socketTimeout/2)) || doSleep ) { - long timeout = dfsClient.getConf().socketTimeout/2 - (now-lastPacket); - timeout = timeout <= 0 ? 1000 : timeout; - timeout = (stage == BlockConstructionStage.DATA_STREAMING)? - timeout : 1000; - try { - dataQueue.wait(timeout); - } catch (InterruptedException e) { - DFSClient.LOG.warn("Caught exception ", e); - } - doSleep = false; - now = Time.monotonicNow(); - } - if (streamerClosed || hasError || !dfsClient.clientRunning) { - continue; - } - // get packet to be sent. - if (dataQueue.isEmpty()) { - one = createHeartbeatPacket(); - assert one != null; - } else { - one = dataQueue.getFirst(); // regular data packet - long parents[] = one.getTraceParents(); - if (parents.length > 0) { - scope = Trace.startSpan("dataStreamer", new TraceInfo(0, parents[0])); - // TODO: use setParents API once it's available from HTrace 3.2 -// scope = Trace.startSpan("dataStreamer", Sampler.ALWAYS); -// scope.getSpan().setParents(parents); - } - } - } - - // get new block from namenode. - if (stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) { - if(DFSClient.LOG.isDebugEnabled()) { - DFSClient.LOG.debug("Allocating new block"); - } - setPipeline(nextBlockOutputStream()); - initDataStreaming(); - } else if (stage == BlockConstructionStage.PIPELINE_SETUP_APPEND) { - if(DFSClient.LOG.isDebugEnabled()) { - DFSClient.LOG.debug("Append to block " + block); - } - setupPipelineForAppendOrRecovery(); - initDataStreaming(); - } - - long lastByteOffsetInBlock = one.getLastByteOffsetBlock(); - if (lastByteOffsetInBlock > blockSize) { - throw new IOException("BlockSize " + blockSize + - " is smaller than data size. " + - " Offset of packet in block " + - lastByteOffsetInBlock + - " Aborting file " + src); - } - - if (one.isLastPacketInBlock()) { - // wait for all data packets have been successfully acked - synchronized (dataQueue) { - while (!streamerClosed && !hasError && - ackQueue.size() != 0 && dfsClient.clientRunning) { - try { - // wait for acks to arrive from datanodes - dataQueue.wait(1000); - } catch (InterruptedException e) { - DFSClient.LOG.warn("Caught exception ", e); - } - } - } - if (streamerClosed || hasError || !dfsClient.clientRunning) { - continue; - } - stage = BlockConstructionStage.PIPELINE_CLOSE; - } - - // send the packet - Span span = null; - synchronized (dataQueue) { - // move packet from dataQueue to ackQueue - if (!one.isHeartbeatPacket()) { - span = scope.detach(); - one.setTraceSpan(span); - dataQueue.removeFirst(); - ackQueue.addLast(one); - dataQueue.notifyAll(); - } - } - - if (DFSClient.LOG.isDebugEnabled()) { - DFSClient.LOG.debug("DataStreamer block " + block + - " sending packet " + one); - } - - // write out data to remote datanode - TraceScope writeScope = Trace.startSpan("writeTo", span); - try { - one.writeTo(blockStream); - blockStream.flush(); - } catch (IOException e) { - // HDFS-3398 treat primary DN is down since client is unable to - // write to primary DN. If a failed or restarting node has already - // been recorded by the responder, the following call will have no - // effect. Pipeline recovery can handle only one node error at a - // time. If the primary node fails again during the recovery, it - // will be taken out then. - tryMarkPrimaryDatanodeFailed(); - throw e; - } finally { - writeScope.close(); - } - lastPacket = Time.monotonicNow(); - - // update bytesSent - long tmpBytesSent = one.getLastByteOffsetBlock(); - if (bytesSent < tmpBytesSent) { - bytesSent = tmpBytesSent; - } - - if (streamerClosed || hasError || !dfsClient.clientRunning) { - continue; - } - - // Is this block full? - if (one.isLastPacketInBlock()) { - // wait for the close packet has been acked - synchronized (dataQueue) { - while (!streamerClosed && !hasError && - ackQueue.size() != 0 && dfsClient.clientRunning) { - dataQueue.wait(1000);// wait for acks to arrive from datanodes - } - } - if (streamerClosed || hasError || !dfsClient.clientRunning) { - continue; - } - - endBlock(); - } - if (progress != null) { progress.progress(); } - - // This is used by unit test to trigger race conditions. - if (artificialSlowdown != 0 && dfsClient.clientRunning) { - Thread.sleep(artificialSlowdown); - } - } catch (Throwable e) { - // Log warning if there was a real error. - if (restartingNodeIndex.get() == -1) { - // Since their messages are descriptive enough, do not always - // log a verbose stack-trace WARN for quota exceptions. - if (e instanceof QuotaExceededException) { - DFSClient.LOG.debug("DataStreamer Quota Exception", e); - } else { - DFSClient.LOG.warn("DataStreamer Exception", e); - } - } - if (e instanceof IOException) { - setLastException((IOException)e); - } else { - setLastException(new IOException("DataStreamer Exception: ",e)); - } - hasError = true; - if (errorIndex == -1 && restartingNodeIndex.get() == -1) { - // Not a datanode issue - streamerClosed = true; - } - } finally { - scope.close(); - } - } - closeInternal(); - } - - private void closeInternal() { - closeResponder(); // close and join - closeStream(); - streamerClosed = true; - setClosed(); - synchronized (dataQueue) { - dataQueue.notifyAll(); - } - } - - /* - * close both streamer and DFSOutputStream, should be called only - * by an external thread and only after all data to be sent has - * been flushed to datanode. - * - * Interrupt this data streamer if force is true - * - * @param force if this data stream is forced to be closed - */ - void close(boolean force) { - streamerClosed = true; - synchronized (dataQueue) { - dataQueue.notifyAll(); - } - if (force) { - this.interrupt(); - } - } - - private void closeResponder() { - if (response != null) { - try { - response.close(); - response.join(); - } catch (InterruptedException e) { - DFSClient.LOG.warn("Caught exception ", e); - } finally { - response = null; - } - } - } - - private void closeStream() { - if (blockStream != null) { - try { - blockStream.close(); - } catch (IOException e) { - setLastException(e); - } finally { - blockStream = null; - } - } - if (blockReplyStream != null) { - try { - blockReplyStream.close(); - } catch (IOException e) { - setLastException(e); - } finally { - blockReplyStream = null; - } - } - if (null != s) { - try { - s.close(); - } catch (IOException e) { - setLastException(e); - } finally { - s = null; - } - } - } - - // The following synchronized methods are used whenever - // errorIndex or restartingNodeIndex is set. This is because - // check & set needs to be atomic. Simply reading variables - // does not require a synchronization. When responder is - // not running (e.g. during pipeline recovery), there is no - // need to use these methods. - - /** Set the error node index. Called by responder */ - synchronized void setErrorIndex(int idx) { - errorIndex = idx; - } - - /** Set the restarting node index. Called by responder */ - synchronized void setRestartingNodeIndex(int idx) { - restartingNodeIndex.set(idx); - // If the data streamer has already set the primary node - // bad, clear it. It is likely that the write failed due to - // the DN shutdown. Even if it was a real failure, the pipeline - // recovery will take care of it. - errorIndex = -1; - } - - /** - * This method is used when no explicit error report was received, - * but something failed. When the primary node is a suspect or - * unsure about the cause, the primary node is marked as failed. - */ - synchronized void tryMarkPrimaryDatanodeFailed() { - // There should be no existing error and no ongoing restart. - if ((errorIndex == -1) && (restartingNodeIndex.get() == -1)) { - errorIndex = 0; - } - } - - /** - * Examine whether it is worth waiting for a node to restart. - * @param index the node index - */ - boolean shouldWaitForRestart(int index) { - // Only one node in the pipeline. - if (nodes.length == 1) { - return true; - } - - // Is it a local node? - InetAddress addr = null; - try { - addr = InetAddress.getByName(nodes[index].getIpAddr()); - } catch (java.net.UnknownHostException e) { - // we are passing an ip address. this should not happen. - assert false; - } - - if (addr != null && NetUtils.isLocalAddress(addr)) { - return true; - } - return false; - } - - // - // Processes responses from the datanodes. A packet is removed - // from the ackQueue when its response arrives. - // - private class ResponseProcessor extends Daemon { - - private volatile boolean responderClosed = false; - private DatanodeInfo[] targets = null; - private boolean isLastPacketInBlock = false; - - ResponseProcessor (DatanodeInfo[] targets) { - this.targets = targets; - } - - @Override - public void run() { - - setName("ResponseProcessor for block " + block); - PipelineAck ack = new PipelineAck(); - - TraceScope scope = NullScope.INSTANCE; - while (!responderClosed && dfsClient.clientRunning && !isLastPacketInBlock) { - // process responses from datanodes. - try { - // read an ack from the pipeline - long begin = Time.monotonicNow(); - ack.readFields(blockReplyStream); - long duration = Time.monotonicNow() - begin; - if (duration > dfsclientSlowLogThresholdMs - && ack.getSeqno() != DFSPacket.HEART_BEAT_SEQNO) { - DFSClient.LOG - .warn("Slow ReadProcessor read fields took " + duration - + "ms (threshold=" + dfsclientSlowLogThresholdMs + "ms); ack: " - + ack + ", targets: " + Arrays.asList(targets)); - } else if (DFSClient.LOG.isDebugEnabled()) { - DFSClient.LOG.debug("DFSClient " + ack); - } - - long seqno = ack.getSeqno(); - // processes response status from datanodes. - for (int i = ack.getNumOfReplies()-1; i >=0 && dfsClient.clientRunning; i--) { - final Status reply = PipelineAck.getStatusFromHeader(ack - .getReply(i)); - // Restart will not be treated differently unless it is - // the local node or the only one in the pipeline. - if (PipelineAck.isRestartOOBStatus(reply) && - shouldWaitForRestart(i)) { - restartDeadline = dfsClient.getConf().datanodeRestartTimeout - + Time.monotonicNow(); - setRestartingNodeIndex(i); - String message = "A datanode is restarting: " + targets[i]; - DFSClient.LOG.info(message); - throw new IOException(message); - } - // node error - if (reply != SUCCESS) { - setErrorIndex(i); // first bad datanode - throw new IOException("Bad response " + reply + - " for block " + block + - " from datanode " + - targets[i]); - } - } - - assert seqno != PipelineAck.UNKOWN_SEQNO : - "Ack for unknown seqno should be a failed ack: " + ack; - if (seqno == DFSPacket.HEART_BEAT_SEQNO) { // a heartbeat ack - continue; - } - - // a success ack for a data packet - DFSPacket one; - synchronized (dataQueue) { - one = ackQueue.getFirst(); - } - if (one.getSeqno() != seqno) { - throw new IOException("ResponseProcessor: Expecting seqno " + - " for block " + block + - one.getSeqno() + " but received " + seqno); - } - isLastPacketInBlock = one.isLastPacketInBlock(); - - // Fail the packet write for testing in order to force a - // pipeline recovery. - if (DFSClientFaultInjector.get().failPacket() && - isLastPacketInBlock) { - failPacket = true; - throw new IOException( - "Failing the last packet for testing."); - } - - // update bytesAcked - block.setNumBytes(one.getLastByteOffsetBlock()); - - synchronized (dataQueue) { - scope = Trace.continueSpan(one.getTraceSpan()); - one.setTraceSpan(null); - lastAckedSeqno = seqno; - ackQueue.removeFirst(); - dataQueue.notifyAll(); - - one.releaseBuffer(byteArrayManager); - } - } catch (Exception e) { - if (!responderClosed) { - if (e instanceof IOException) { - setLastException((IOException)e); - } - hasError = true; - // If no explicit error report was received, mark the primary - // node as failed. - tryMarkPrimaryDatanodeFailed(); - synchronized (dataQueue) { - dataQueue.notifyAll(); - } - if (restartingNodeIndex.get() == -1) { - DFSClient.LOG.warn("DFSOutputStream ResponseProcessor exception " - + " for block " + block, e); - } - responderClosed = true; - } - } finally { - scope.close(); - } - } - } - - void close() { - responderClosed = true; - this.interrupt(); - } - } - - // If this stream has encountered any errors so far, shutdown - // threads and mark stream as closed. Returns true if we should - // sleep for a while after returning from this call. - // - private boolean processDatanodeError() throws IOException { - if (response != null) { - DFSClient.LOG.info("Error Recovery for " + block + - " waiting for responder to exit. "); - return true; - } - closeStream(); - - // move packets from ack queue to front of the data queue - synchronized (dataQueue) { - dataQueue.addAll(0, ackQueue); - ackQueue.clear(); - } - - // Record the new pipeline failure recovery. - if (lastAckedSeqnoBeforeFailure != lastAckedSeqno) { - lastAckedSeqnoBeforeFailure = lastAckedSeqno; - pipelineRecoveryCount = 1; - } else { - // If we had to recover the pipeline five times in a row for the - // same packet, this client likely has corrupt data or corrupting - // during transmission. - if (++pipelineRecoveryCount > 5) { - DFSClient.LOG.warn("Error recovering pipeline for writing " + - block + ". Already retried 5 times for the same packet."); - lastException.set(new IOException("Failing write. Tried pipeline " + - "recovery 5 times without success.")); - streamerClosed = true; - return false; - } - } - boolean doSleep = setupPipelineForAppendOrRecovery(); - - if (!streamerClosed && dfsClient.clientRunning) { - if (stage == BlockConstructionStage.PIPELINE_CLOSE) { - - // If we had an error while closing the pipeline, we go through a fast-path - // where the BlockReceiver does not run. Instead, the DataNode just finalizes - // the block immediately during the 'connect ack' process. So, we want to pull - // the end-of-block packet from the dataQueue, since we don't actually have - // a true pipeline to send it over. - // - // We also need to set lastAckedSeqno to the end-of-block Packet's seqno, so that - // a client waiting on close() will be aware that the flush finished. - synchronized (dataQueue) { - DFSPacket endOfBlockPacket = dataQueue.remove(); // remove the end of block packet - Span span = endOfBlockPacket.getTraceSpan(); - if (span != null) { - // Close any trace span associated with this Packet - TraceScope scope = Trace.continueSpan(span); - scope.close(); - } - assert endOfBlockPacket.isLastPacketInBlock(); - assert lastAckedSeqno == endOfBlockPacket.getSeqno() - 1; - lastAckedSeqno = endOfBlockPacket.getSeqno(); - dataQueue.notifyAll(); - } - endBlock(); - } else { - initDataStreaming(); - } - } - - return doSleep; - } - - private void setHflush() { - isHflushed = true; - } - - private int findNewDatanode(final DatanodeInfo[] original - ) throws IOException { - if (nodes.length != original.length + 1) { - throw new IOException( - new StringBuilder() - .append("Failed to replace a bad datanode on the existing pipeline ") - .append("due to no more good datanodes being available to try. ") - .append("(Nodes: current=").append(Arrays.asList(nodes)) - .append(", original=").append(Arrays.asList(original)).append("). ") - .append("The current failed datanode replacement policy is ") - .append(dfsClient.dtpReplaceDatanodeOnFailure).append(", and ") - .append("a client may configure this via '") - .append(DFSConfigKeys.DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_POLICY_KEY) - .append("' in its configuration.") - .toString()); - } - for(int i = 0; i < nodes.length; i++) { - int j = 0; - for(; j < original.length && !nodes[i].equals(original[j]); j++); - if (j == original.length) { - return i; - } - } - throw new IOException("Failed: new datanode not found: nodes=" - + Arrays.asList(nodes) + ", original=" + Arrays.asList(original)); - } - - private void addDatanode2ExistingPipeline() throws IOException { - if (DataTransferProtocol.LOG.isDebugEnabled()) { - DataTransferProtocol.LOG.debug("lastAckedSeqno = " + lastAckedSeqno); - } - /* - * Is data transfer necessary? We have the following cases. - * - * Case 1: Failure in Pipeline Setup - * - Append - * + Transfer the stored replica, which may be a RBW or a finalized. - * - Create - * + If no data, then no transfer is required. - * + If there are data written, transfer RBW. This case may happens - * when there are streaming failure earlier in this pipeline. - * - * Case 2: Failure in Streaming - * - Append/Create: - * + transfer RBW - * - * Case 3: Failure in Close - * - Append/Create: - * + no transfer, let NameNode replicates the block. - */ - if (!isAppend && lastAckedSeqno < 0 - && stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) { - //no data have been written - return; - } else if (stage == BlockConstructionStage.PIPELINE_CLOSE - || stage == BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) { - //pipeline is closing - return; - } - - //get a new datanode - final DatanodeInfo[] original = nodes; - final LocatedBlock lb = dfsClient.namenode.getAdditionalDatanode( - src, fileId, block, nodes, storageIDs, - failed.toArray(new DatanodeInfo[failed.size()]), - 1, dfsClient.clientName); - setPipeline(lb); - - //find the new datanode - final int d = findNewDatanode(original); - - //transfer replica - final DatanodeInfo src = d == 0? nodes[1]: nodes[d - 1]; - final DatanodeInfo[] targets = {nodes[d]}; - final StorageType[] targetStorageTypes = {storageTypes[d]}; - transfer(src, targets, targetStorageTypes, lb.getBlockToken()); - } - - private void transfer(final DatanodeInfo src, final DatanodeInfo[] targets, - final StorageType[] targetStorageTypes, - final Token blockToken) throws IOException { - //transfer replica to the new datanode - Socket sock = null; - DataOutputStream out = null; - DataInputStream in = null; - try { - sock = createSocketForPipeline(src, 2, dfsClient); - final long writeTimeout = dfsClient.getDatanodeWriteTimeout(2); - - OutputStream unbufOut = NetUtils.getOutputStream(sock, writeTimeout); - InputStream unbufIn = NetUtils.getInputStream(sock); - IOStreamPair saslStreams = dfsClient.saslClient.socketSend(sock, - unbufOut, unbufIn, dfsClient, blockToken, src); - unbufOut = saslStreams.out; - unbufIn = saslStreams.in; - out = new DataOutputStream(new BufferedOutputStream(unbufOut, - HdfsConstants.SMALL_BUFFER_SIZE)); - in = new DataInputStream(unbufIn); - - //send the TRANSFER_BLOCK request - new Sender(out).transferBlock(block, blockToken, dfsClient.clientName, - targets, targetStorageTypes); - out.flush(); - - //ack - BlockOpResponseProto response = - BlockOpResponseProto.parseFrom(PBHelper.vintPrefixed(in)); - if (SUCCESS != response.getStatus()) { - throw new IOException("Failed to add a datanode"); - } - } finally { - IOUtils.closeStream(in); - IOUtils.closeStream(out); - IOUtils.closeSocket(sock); - } - } - - /** - * Open a DataOutputStream to a DataNode pipeline so that - * it can be written to. - * This happens when a file is appended or data streaming fails - * It keeps on trying until a pipeline is setup - */ - private boolean setupPipelineForAppendOrRecovery() throws IOException { - // check number of datanodes - if (nodes == null || nodes.length == 0) { - String msg = "Could not get block locations. " + "Source file \"" - + src + "\" - Aborting..."; - DFSClient.LOG.warn(msg); - setLastException(new IOException(msg)); - streamerClosed = true; - return false; - } - - boolean success = false; - long newGS = 0L; - while (!success && !streamerClosed && dfsClient.clientRunning) { - // Sleep before reconnect if a dn is restarting. - // This process will be repeated until the deadline or the datanode - // starts back up. - if (restartingNodeIndex.get() >= 0) { - // 4 seconds or the configured deadline period, whichever is shorter. - // This is the retry interval and recovery will be retried in this - // interval until timeout or success. - long delay = Math.min(dfsClient.getConf().datanodeRestartTimeout, - 4000L); - try { - Thread.sleep(delay); - } catch (InterruptedException ie) { - lastException.set(new IOException("Interrupted while waiting for " + - "datanode to restart. " + nodes[restartingNodeIndex.get()])); - streamerClosed = true; - return false; - } - } - boolean isRecovery = hasError; - // remove bad datanode from list of datanodes. - // If errorIndex was not set (i.e. appends), then do not remove - // any datanodes - // - if (errorIndex >= 0) { - StringBuilder pipelineMsg = new StringBuilder(); - for (int j = 0; j < nodes.length; j++) { - pipelineMsg.append(nodes[j]); - if (j < nodes.length - 1) { - pipelineMsg.append(", "); - } - } - if (nodes.length <= 1) { - lastException.set(new IOException("All datanodes " + pipelineMsg - + " are bad. Aborting...")); - streamerClosed = true; - return false; - } - DFSClient.LOG.warn("Error Recovery for block " + block + - " in pipeline " + pipelineMsg + - ": bad datanode " + nodes[errorIndex]); - failed.add(nodes[errorIndex]); - - DatanodeInfo[] newnodes = new DatanodeInfo[nodes.length-1]; - arraycopy(nodes, newnodes, errorIndex); - - final StorageType[] newStorageTypes = new StorageType[newnodes.length]; - arraycopy(storageTypes, newStorageTypes, errorIndex); - - final String[] newStorageIDs = new String[newnodes.length]; - arraycopy(storageIDs, newStorageIDs, errorIndex); - - setPipeline(newnodes, newStorageTypes, newStorageIDs); - - // Just took care of a node error while waiting for a node restart - if (restartingNodeIndex.get() >= 0) { - // If the error came from a node further away than the restarting - // node, the restart must have been complete. - if (errorIndex > restartingNodeIndex.get()) { - restartingNodeIndex.set(-1); - } else if (errorIndex < restartingNodeIndex.get()) { - // the node index has shifted. - restartingNodeIndex.decrementAndGet(); - } else { - // this shouldn't happen... - assert false; - } - } - - if (restartingNodeIndex.get() == -1) { - hasError = false; - } - lastException.set(null); - errorIndex = -1; - } - - // Check if replace-datanode policy is satisfied. - if (dfsClient.dtpReplaceDatanodeOnFailure.satisfy(blockReplication, - nodes, isAppend, isHflushed)) { - try { - addDatanode2ExistingPipeline(); - } catch(IOException ioe) { - if (!dfsClient.dtpReplaceDatanodeOnFailure.isBestEffort()) { - throw ioe; - } - DFSClient.LOG.warn("Failed to replace datanode." - + " Continue with the remaining datanodes since " - + DFSConfigKeys.DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_BEST_EFFORT_KEY - + " is set to true.", ioe); - } - } - - // get a new generation stamp and an access token - LocatedBlock lb = dfsClient.namenode.updateBlockForPipeline(block, dfsClient.clientName); - newGS = lb.getBlock().getGenerationStamp(); - accessToken = lb.getBlockToken(); - - // set up the pipeline again with the remaining nodes - if (failPacket) { // for testing - success = createBlockOutputStream(nodes, storageTypes, newGS, isRecovery); - failPacket = false; - try { - // Give DNs time to send in bad reports. In real situations, - // good reports should follow bad ones, if client committed - // with those nodes. - Thread.sleep(2000); - } catch (InterruptedException ie) {} - } else { - success = createBlockOutputStream(nodes, storageTypes, newGS, isRecovery); - } - - if (restartingNodeIndex.get() >= 0) { - assert hasError == true; - // check errorIndex set above - if (errorIndex == restartingNodeIndex.get()) { - // ignore, if came from the restarting node - errorIndex = -1; - } - // still within the deadline - if (Time.monotonicNow() < restartDeadline) { - continue; // with in the deadline - } - // expired. declare the restarting node dead - restartDeadline = 0; - int expiredNodeIndex = restartingNodeIndex.get(); - restartingNodeIndex.set(-1); - DFSClient.LOG.warn("Datanode did not restart in time: " + - nodes[expiredNodeIndex]); - // Mark the restarting node as failed. If there is any other failed - // node during the last pipeline construction attempt, it will not be - // overwritten/dropped. In this case, the restarting node will get - // excluded in the following attempt, if it still does not come up. - if (errorIndex == -1) { - errorIndex = expiredNodeIndex; - } - // From this point on, normal pipeline recovery applies. - } - } // while - - if (success) { - // update pipeline at the namenode - ExtendedBlock newBlock = new ExtendedBlock( - block.getBlockPoolId(), block.getBlockId(), block.getNumBytes(), newGS); - dfsClient.namenode.updatePipeline(dfsClient.clientName, block, newBlock, - nodes, storageIDs); - // update client side generation stamp - block = newBlock; - } - return false; // do not sleep, continue processing - } - - /** - * Open a DataOutputStream to a DataNode so that it can be written to. - * This happens when a file is created and each time a new block is allocated. - * Must get block ID and the IDs of the destinations from the namenode. - * Returns the list of target datanodes. - */ - private LocatedBlock nextBlockOutputStream() throws IOException { - LocatedBlock lb = null; - DatanodeInfo[] nodes = null; - StorageType[] storageTypes = null; - int count = dfsClient.getConf().nBlockWriteRetry; - boolean success = false; - ExtendedBlock oldBlock = block; - do { - hasError = false; - lastException.set(null); - errorIndex = -1; - success = false; - - DatanodeInfo[] excluded = - excludedNodes.getAllPresent(excludedNodes.asMap().keySet()) - .keySet() - .toArray(new DatanodeInfo[0]); - block = oldBlock; - lb = locateFollowingBlock(excluded.length > 0 ? excluded : null); - block = lb.getBlock(); - block.setNumBytes(0); - bytesSent = 0; - accessToken = lb.getBlockToken(); - nodes = lb.getLocations(); - storageTypes = lb.getStorageTypes(); - - // - // Connect to first DataNode in the list. - // - success = createBlockOutputStream(nodes, storageTypes, 0L, false); - - if (!success) { - DFSClient.LOG.info("Abandoning " + block); - dfsClient.namenode.abandonBlock(block, fileId, src, - dfsClient.clientName); - block = null; - DFSClient.LOG.info("Excluding datanode " + nodes[errorIndex]); - excludedNodes.put(nodes[errorIndex], nodes[errorIndex]); - } - } while (!success && --count >= 0); - - if (!success) { - throw new IOException("Unable to create new block."); - } - return lb; - } - - // connects to the first datanode in the pipeline - // Returns true if success, otherwise return failure. - // - private boolean createBlockOutputStream(DatanodeInfo[] nodes, - StorageType[] nodeStorageTypes, long newGS, boolean recoveryFlag) { - if (nodes.length == 0) { - DFSClient.LOG.info("nodes are empty for write pipeline of block " - + block); - return false; - } - Status pipelineStatus = SUCCESS; - String firstBadLink = ""; - boolean checkRestart = false; - if (DFSClient.LOG.isDebugEnabled()) { - for (int i = 0; i < nodes.length; i++) { - DFSClient.LOG.debug("pipeline = " + nodes[i]); - } - } - - // persist blocks on namenode on next flush - persistBlocks.set(true); - - int refetchEncryptionKey = 1; - while (true) { - boolean result = false; - DataOutputStream out = null; - try { - assert null == s : "Previous socket unclosed"; - assert null == blockReplyStream : "Previous blockReplyStream unclosed"; - s = createSocketForPipeline(nodes[0], nodes.length, dfsClient); - long writeTimeout = dfsClient.getDatanodeWriteTimeout(nodes.length); - - OutputStream unbufOut = NetUtils.getOutputStream(s, writeTimeout); - InputStream unbufIn = NetUtils.getInputStream(s); - IOStreamPair saslStreams = dfsClient.saslClient.socketSend(s, - unbufOut, unbufIn, dfsClient, accessToken, nodes[0]); - unbufOut = saslStreams.out; - unbufIn = saslStreams.in; - out = new DataOutputStream(new BufferedOutputStream(unbufOut, - HdfsConstants.SMALL_BUFFER_SIZE)); - blockReplyStream = new DataInputStream(unbufIn); - - // - // Xmit header info to datanode - // - - BlockConstructionStage bcs = recoveryFlag? stage.getRecoveryStage(): stage; - - // We cannot change the block length in 'block' as it counts the number - // of bytes ack'ed. - ExtendedBlock blockCopy = new ExtendedBlock(block); - blockCopy.setNumBytes(blockSize); - - boolean[] targetPinnings = getPinnings(nodes, true); - // send the request - new Sender(out).writeBlock(blockCopy, nodeStorageTypes[0], accessToken, - dfsClient.clientName, nodes, nodeStorageTypes, null, bcs, - nodes.length, block.getNumBytes(), bytesSent, newGS, - checksum4WriteBlock, cachingStrategy.get(), isLazyPersistFile, - (targetPinnings == null ? false : targetPinnings[0]), targetPinnings); - - // receive ack for connect - BlockOpResponseProto resp = BlockOpResponseProto.parseFrom( - PBHelper.vintPrefixed(blockReplyStream)); - pipelineStatus = resp.getStatus(); - firstBadLink = resp.getFirstBadLink(); - - // Got an restart OOB ack. - // If a node is already restarting, this status is not likely from - // the same node. If it is from a different node, it is not - // from the local datanode. Thus it is safe to treat this as a - // regular node error. - if (PipelineAck.isRestartOOBStatus(pipelineStatus) && - restartingNodeIndex.get() == -1) { - checkRestart = true; - throw new IOException("A datanode is restarting."); - } - - String logInfo = "ack with firstBadLink as " + firstBadLink; - DataTransferProtoUtil.checkBlockOpStatus(resp, logInfo); - - assert null == blockStream : "Previous blockStream unclosed"; - blockStream = out; - result = true; // success - restartingNodeIndex.set(-1); - hasError = false; - } catch (IOException ie) { - if (restartingNodeIndex.get() == -1) { - DFSClient.LOG.info("Exception in createBlockOutputStream", ie); - } - if (ie instanceof InvalidEncryptionKeyException && refetchEncryptionKey > 0) { - DFSClient.LOG.info("Will fetch a new encryption key and retry, " - + "encryption key was invalid when connecting to " - + nodes[0] + " : " + ie); - // The encryption key used is invalid. - refetchEncryptionKey--; - dfsClient.clearDataEncryptionKey(); - // Don't close the socket/exclude this node just yet. Try again with - // a new encryption key. - continue; - } - - // find the datanode that matches - if (firstBadLink.length() != 0) { - for (int i = 0; i < nodes.length; i++) { - // NB: Unconditionally using the xfer addr w/o hostname - if (firstBadLink.equals(nodes[i].getXferAddr())) { - errorIndex = i; - break; - } - } - } else { - assert checkRestart == false; - errorIndex = 0; - } - // Check whether there is a restart worth waiting for. - if (checkRestart && shouldWaitForRestart(errorIndex)) { - restartDeadline = dfsClient.getConf().datanodeRestartTimeout + - Time.monotonicNow(); - restartingNodeIndex.set(errorIndex); - errorIndex = -1; - DFSClient.LOG.info("Waiting for the datanode to be restarted: " + - nodes[restartingNodeIndex.get()]); - } - hasError = true; - setLastException(ie); - result = false; // error - } finally { - if (!result) { - IOUtils.closeSocket(s); - s = null; - IOUtils.closeStream(out); - out = null; - IOUtils.closeStream(blockReplyStream); - blockReplyStream = null; - } - } - return result; - } - } - - private boolean[] getPinnings(DatanodeInfo[] nodes, boolean shouldLog) { - if (favoredNodes == null) { - return null; - } else { - boolean[] pinnings = new boolean[nodes.length]; - HashSet favoredSet = - new HashSet(Arrays.asList(favoredNodes)); - for (int i = 0; i < nodes.length; i++) { - pinnings[i] = favoredSet.remove(nodes[i].getXferAddrWithHostname()); - if (DFSClient.LOG.isDebugEnabled()) { - DFSClient.LOG.debug(nodes[i].getXferAddrWithHostname() + - " was chosen by name node (favored=" + pinnings[i] + - ")."); - } - } - if (shouldLog && !favoredSet.isEmpty()) { - // There is one or more favored nodes that were not allocated. - DFSClient.LOG.warn( - "These favored nodes were specified but not chosen: " + - favoredSet + - " Specified favored nodes: " + Arrays.toString(favoredNodes)); - - } - return pinnings; - } - } - - private LocatedBlock locateFollowingBlock(DatanodeInfo[] excludedNodes) throws IOException { - int retries = dfsClient.getConf().nBlockWriteLocateFollowingRetry; - long sleeptime = dfsClient.getConf(). - blockWriteLocateFollowingInitialDelayMs; - while (true) { - long localstart = Time.monotonicNow(); - while (true) { - try { - return dfsClient.namenode.addBlock(src, dfsClient.clientName, - block, excludedNodes, fileId, favoredNodes); - } catch (RemoteException e) { - IOException ue = - e.unwrapRemoteException(FileNotFoundException.class, - AccessControlException.class, - NSQuotaExceededException.class, - DSQuotaExceededException.class, - UnresolvedPathException.class); - if (ue != e) { - throw ue; // no need to retry these exceptions - } - - - if (NotReplicatedYetException.class.getName(). - equals(e.getClassName())) { - if (retries == 0) { - throw e; - } else { - --retries; - DFSClient.LOG.info("Exception while adding a block", e); - long elapsed = Time.monotonicNow() - localstart; - if (elapsed > 5000) { - DFSClient.LOG.info("Waiting for replication for " - + (elapsed / 1000) + " seconds"); - } - try { - DFSClient.LOG.warn("NotReplicatedYetException sleeping " + src - + " retries left " + retries); - Thread.sleep(sleeptime); - sleeptime *= 2; - } catch (InterruptedException ie) { - DFSClient.LOG.warn("Caught exception ", ie); - } - } - } else { - throw e; - } - - } - } - } - } - - ExtendedBlock getBlock() { - return block; - } - - DatanodeInfo[] getNodes() { - return nodes; - } - - Token getBlockToken() { - return accessToken; - } - - private void setLastException(IOException e) { - lastException.compareAndSet(null, e); - } - } - - /** - * Create a socket for a write pipeline - * @param first the first datanode - * @param length the pipeline length - * @param client client - * @return the socket connected to the first datanode - */ - static Socket createSocketForPipeline(final DatanodeInfo first, - final int length, final DFSClient client) throws IOException { - final String dnAddr = first.getXferAddr( - client.getConf().connectToDnViaHostname); - if (DFSClient.LOG.isDebugEnabled()) { - DFSClient.LOG.debug("Connecting to datanode " + dnAddr); - } - final InetSocketAddress isa = NetUtils.createSocketAddr(dnAddr); - final Socket sock = client.socketFactory.createSocket(); - final int timeout = client.getDatanodeReadTimeout(length); - NetUtils.connect(sock, isa, client.getRandomLocalInterfaceAddr(), client.getConf().socketTimeout); - sock.setSoTimeout(timeout); - sock.setSendBufferSize(HdfsConstants.DEFAULT_DATA_SOCKET_SIZE); - if(DFSClient.LOG.isDebugEnabled()) { - DFSClient.LOG.debug("Send buf size " + sock.getSendBufferSize()); - } - return sock; - } - @Override protected void checkClosed() throws IOException { if (isClosed()) { - IOException e = lastException.get(); + IOException e = streamer.getLastException().get(); throw e != null ? e : new ClosedChannelException(); } } @@ -1536,7 +148,7 @@ protected void checkClosed() throws IOException { // @VisibleForTesting public synchronized DatanodeInfo[] getPipeline() { - if (streamer == null) { + if (streamer.streamerClosed()) { return null; } DatanodeInfo[] currentNodes = streamer.getNodes(); @@ -1556,7 +168,7 @@ public synchronized DatanodeInfo[] getPipeline() { */ private static DataChecksum getChecksum4Compute(DataChecksum checksum, HdfsFileStatus stat) { - if (isLazyPersist(stat) && stat.getReplication() == 1) { + if (DataStreamer.isLazyPersist(stat) && stat.getReplication() == 1) { // do not compute checksum for writing to single replica to memory return DataChecksum.newDataChecksum(Type.NULL, checksum.getBytesPerChecksum()); @@ -1573,7 +185,6 @@ private DFSOutputStream(DFSClient dfsClient, String src, Progressable progress, this.blockSize = stat.getBlockSize(); this.blockReplication = stat.getReplication(); this.fileEncryptionInfo = stat.getFileEncryptionInfo(); - this.progress = progress; this.cachingStrategy = new AtomicReference( dfsClient.getDefaultWriteCachingStrategy()); if ((progress != null) && DFSClient.LOG.isDebugEnabled()) { @@ -1591,10 +202,6 @@ private DFSOutputStream(DFSClient dfsClient, String src, Progressable progress, + DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY + " (=" + bytesPerChecksum + ") must divide block size (=" + blockSize + ")."); } - this.checksum4WriteBlock = checksum; - - this.dfsclientSlowLogThresholdMs = - dfsClient.getConf().dfsclientSlowIoWarningThresholdMs; this.byteArrayManager = dfsClient.getClientContext().getByteArrayManager(); } @@ -1607,7 +214,8 @@ private DFSOutputStream(DFSClient dfsClient, String src, HdfsFileStatus stat, computePacketChunkSize(dfsClient.getConf().writePacketSize, bytesPerChecksum); - streamer = new DataStreamer(stat, null); + streamer = new DataStreamer(stat, null, dfsClient, src, progress, checksum, + cachingStrategy, byteArrayManager); if (favoredNodes != null && favoredNodes.length != 0) { streamer.setFavoredNodes(favoredNodes); } @@ -1676,18 +284,57 @@ private DFSOutputStream(DFSClient dfsClient, String src, boolean toNewBlock, this(dfsClient, src, progress, stat, checksum); initialFileSize = stat.getLen(); // length of file when opened + this.fileEncryptionInfo = stat.getFileEncryptionInfo(); + // The last partial block of the file has to be filled. if (!toNewBlock && lastBlock != null) { // indicate that we are appending to an existing block - bytesCurBlock = lastBlock.getBlockSize(); - streamer = new DataStreamer(lastBlock, stat, bytesPerChecksum); + streamer = new DataStreamer(lastBlock, stat, dfsClient, src, progress, checksum, + cachingStrategy, byteArrayManager); + streamer.setBytesCurBlock(lastBlock.getBlockSize()); + adjustPacketChunkSize(stat); + streamer.setPipelineInConstruction(lastBlock); } else { computePacketChunkSize(dfsClient.getConf().writePacketSize, bytesPerChecksum); - streamer = new DataStreamer(stat, - lastBlock != null ? lastBlock.getBlock() : null); + streamer = new DataStreamer(stat, lastBlock != null ? lastBlock.getBlock() : null, + dfsClient, src, progress, checksum, cachingStrategy, byteArrayManager); + } + } + + private void adjustPacketChunkSize(HdfsFileStatus stat) throws IOException{ + + long usedInLastBlock = stat.getLen() % blockSize; + int freeInLastBlock = (int)(blockSize - usedInLastBlock); + + // calculate the amount of free space in the pre-existing + // last crc chunk + int usedInCksum = (int)(stat.getLen() % bytesPerChecksum); + int freeInCksum = bytesPerChecksum - usedInCksum; + + // if there is space in the last block, then we have to + // append to that block + if (freeInLastBlock == blockSize) { + throw new IOException("The last block for file " + + src + " is full."); + } + + if (usedInCksum > 0 && freeInCksum > 0) { + // if there is space in the last partial chunk, then + // setup in such a way that the next packet will have only + // one chunk that fills up the partial chunk. + // + computePacketChunkSize(0, freeInCksum); + setChecksumBufSize(freeInCksum); + streamer.setAppendChunk(true); + } else { + // if the remaining space in the block is smaller than + // that expected size of of a packet, then create + // smaller size packet. + // + computePacketChunkSize(Math.min(dfsClient.getConf().writePacketSize, freeInLastBlock), + bytesPerChecksum); } - this.fileEncryptionInfo = stat.getFileEncryptionInfo(); } static DFSOutputStream newStreamForAppend(DFSClient dfsClient, String src, @@ -1708,12 +355,6 @@ static DFSOutputStream newStreamForAppend(DFSClient dfsClient, String src, scope.close(); } } - - private static boolean isLazyPersist(HdfsFileStatus stat) { - final BlockStoragePolicy p = blockStoragePolicySuite.getPolicy( - HdfsConstants.MEMORY_STORAGE_POLICY_NAME); - return p != null && stat.getStoragePolicy() == p.getId(); - } private void computePacketChunkSize(int psize, int csize) { final int bodySize = psize - PacketHeader.PKT_MAX_HEADER_LEN; @@ -1728,62 +369,6 @@ private void computePacketChunkSize(int psize, int csize) { } } - private void queueCurrentPacket() { - synchronized (dataQueue) { - if (currentPacket == null) return; - currentPacket.addTraceParent(Trace.currentSpan()); - dataQueue.addLast(currentPacket); - lastQueuedSeqno = currentPacket.getSeqno(); - if (DFSClient.LOG.isDebugEnabled()) { - DFSClient.LOG.debug("Queued packet " + currentPacket.getSeqno()); - } - currentPacket = null; - dataQueue.notifyAll(); - } - } - - private void waitAndQueueCurrentPacket() throws IOException { - synchronized (dataQueue) { - try { - // If queue is full, then wait till we have enough space - boolean firstWait = true; - try { - while (!isClosed() && dataQueue.size() + ackQueue.size() > - dfsClient.getConf().writeMaxPackets) { - if (firstWait) { - Span span = Trace.currentSpan(); - if (span != null) { - span.addTimelineAnnotation("dataQueue.wait"); - } - firstWait = false; - } - try { - dataQueue.wait(); - } catch (InterruptedException e) { - // If we get interrupted while waiting to queue data, we still need to get rid - // of the current packet. This is because we have an invariant that if - // currentPacket gets full, it will get queued before the next writeChunk. - // - // Rather than wait around for space in the queue, we should instead try to - // return to the caller as soon as possible, even though we slightly overrun - // the MAX_PACKETS length. - Thread.currentThread().interrupt(); - break; - } - } - } finally { - Span span = Trace.currentSpan(); - if ((span != null) && (!firstWait)) { - span.addTimelineAnnotation("end.wait"); - } - } - checkClosed(); - queueCurrentPacket(); - } catch (ClosedChannelException e) { - } - } - } - // @see FSOutputSummer#writeChunk() @Override protected synchronized void writeChunk(byte[] b, int offset, int len, @@ -1814,57 +399,62 @@ private synchronized void writeChunkImpl(byte[] b, int offset, int len, if (currentPacket == null) { currentPacket = createPacket(packetSize, chunksPerPacket, - bytesCurBlock, currentSeqno++, false); + streamer.getBytesCurBlock(), streamer.getAndIncCurrentSeqno(), false); if (DFSClient.LOG.isDebugEnabled()) { DFSClient.LOG.debug("DFSClient writeChunk allocating new packet seqno=" + currentPacket.getSeqno() + ", src=" + src + ", packetSize=" + packetSize + ", chunksPerPacket=" + chunksPerPacket + - ", bytesCurBlock=" + bytesCurBlock); + ", bytesCurBlock=" + streamer.getBytesCurBlock()); } } currentPacket.writeChecksum(checksum, ckoff, cklen); currentPacket.writeData(b, offset, len); currentPacket.incNumChunks(); - bytesCurBlock += len; + streamer.incBytesCurBlock(len); // If packet is full, enqueue it for transmission // if (currentPacket.getNumChunks() == currentPacket.getMaxChunks() || - bytesCurBlock == blockSize) { + streamer.getBytesCurBlock() == blockSize) { if (DFSClient.LOG.isDebugEnabled()) { DFSClient.LOG.debug("DFSClient writeChunk packet full seqno=" + currentPacket.getSeqno() + ", src=" + src + - ", bytesCurBlock=" + bytesCurBlock + + ", bytesCurBlock=" + streamer.getBytesCurBlock() + ", blockSize=" + blockSize + - ", appendChunk=" + appendChunk); + ", appendChunk=" + streamer.getAppendChunk()); } - waitAndQueueCurrentPacket(); + streamer.waitAndQueuePacket(currentPacket); + currentPacket = null; // If the reopened file did not end at chunk boundary and the above // write filled up its partial chunk. Tell the summer to generate full // crc chunks from now on. - if (appendChunk && bytesCurBlock%bytesPerChecksum == 0) { - appendChunk = false; + if (streamer.getAppendChunk() && + streamer.getBytesCurBlock() % bytesPerChecksum == 0) { + streamer.setAppendChunk(false); resetChecksumBufSize(); } - if (!appendChunk) { - int psize = Math.min((int)(blockSize-bytesCurBlock), dfsClient.getConf().writePacketSize); + if (!streamer.getAppendChunk()) { + int psize = Math.min((int)(blockSize-streamer.getBytesCurBlock()), + dfsClient.getConf().writePacketSize); computePacketChunkSize(psize, bytesPerChecksum); } // // if encountering a block boundary, send an empty packet to // indicate the end of block and reset bytesCurBlock. // - if (bytesCurBlock == blockSize) { - currentPacket = createPacket(0, 0, bytesCurBlock, currentSeqno++, true); + if (streamer.getBytesCurBlock() == blockSize) { + currentPacket = createPacket(0, 0, streamer.getBytesCurBlock(), + streamer.getAndIncCurrentSeqno(), true); currentPacket.setSyncBlock(shouldSyncBlock); - waitAndQueueCurrentPacket(); - bytesCurBlock = 0; + streamer.waitAndQueuePacket(currentPacket); + currentPacket = null; + streamer.setBytesCurBlock(0); lastFlushOffset = 0; } } @@ -1954,30 +544,30 @@ private void flushOrSync(boolean isSync, EnumSet syncFlags) if (DFSClient.LOG.isDebugEnabled()) { DFSClient.LOG.debug("DFSClient flush(): " - + " bytesCurBlock=" + bytesCurBlock + + " bytesCurBlock=" + streamer.getBytesCurBlock() + " lastFlushOffset=" + lastFlushOffset + " createNewBlock=" + endBlock); } // Flush only if we haven't already flushed till this offset. - if (lastFlushOffset != bytesCurBlock) { - assert bytesCurBlock > lastFlushOffset; + if (lastFlushOffset != streamer.getBytesCurBlock()) { + assert streamer.getBytesCurBlock() > lastFlushOffset; // record the valid offset of this flush - lastFlushOffset = bytesCurBlock; + lastFlushOffset = streamer.getBytesCurBlock(); if (isSync && currentPacket == null && !endBlock) { // Nothing to send right now, // but sync was requested. // Send an empty packet if we do not end the block right now currentPacket = createPacket(packetSize, chunksPerPacket, - bytesCurBlock, currentSeqno++, false); + streamer.getBytesCurBlock(), streamer.getAndIncCurrentSeqno(), false); } } else { - if (isSync && bytesCurBlock > 0 && !endBlock) { + if (isSync && streamer.getBytesCurBlock() > 0 && !endBlock) { // Nothing to send right now, // and the block was partially written, // and sync was requested. // So send an empty sync packet if we do not end the block right now currentPacket = createPacket(packetSize, chunksPerPacket, - bytesCurBlock, currentSeqno++, false); + streamer.getBytesCurBlock(), streamer.getAndIncCurrentSeqno(), false); } else if (currentPacket != null) { // just discard the current packet since it is already been sent. currentPacket.releaseBuffer(byteArrayManager); @@ -1986,39 +576,42 @@ private void flushOrSync(boolean isSync, EnumSet syncFlags) } if (currentPacket != null) { currentPacket.setSyncBlock(isSync); - waitAndQueueCurrentPacket(); + streamer.waitAndQueuePacket(currentPacket); + currentPacket = null; } - if (endBlock && bytesCurBlock > 0) { + if (endBlock && streamer.getBytesCurBlock() > 0) { // Need to end the current block, thus send an empty packet to // indicate this is the end of the block and reset bytesCurBlock - currentPacket = createPacket(0, 0, bytesCurBlock, currentSeqno++, true); + currentPacket = createPacket(0, 0, streamer.getBytesCurBlock(), + streamer.getAndIncCurrentSeqno(), true); currentPacket.setSyncBlock(shouldSyncBlock || isSync); - waitAndQueueCurrentPacket(); - bytesCurBlock = 0; + streamer.waitAndQueuePacket(currentPacket); + currentPacket = null; + streamer.setBytesCurBlock(0); lastFlushOffset = 0; } else { // Restore state of stream. Record the last flush offset // of the last full chunk that was flushed. - bytesCurBlock -= numKept; + streamer.setBytesCurBlock(streamer.getBytesCurBlock() - numKept); } - toWaitFor = lastQueuedSeqno; + toWaitFor = streamer.getLastQueuedSeqno(); } // end synchronized - waitForAckedSeqno(toWaitFor); + streamer.waitForAckedSeqno(toWaitFor); // update the block length first time irrespective of flag - if (updateLength || persistBlocks.get()) { + if (updateLength || streamer.getPersistBlocks().get()) { synchronized (this) { - if (streamer != null && streamer.block != null) { - lastBlockLength = streamer.block.getNumBytes(); + if (!streamer.streamerClosed() && streamer.getBlock() != null) { + lastBlockLength = streamer.getBlock().getNumBytes(); } } } // If 1) any new blocks were allocated since the last flush, or 2) to // update length in NN is required, then persist block locations on // namenode. - if (persistBlocks.getAndSet(false) || updateLength) { + if (streamer.getPersistBlocks().getAndSet(false) || updateLength) { try { dfsClient.namenode.fsync(src, fileId, dfsClient.clientName, lastBlockLength); @@ -2035,7 +628,7 @@ private void flushOrSync(boolean isSync, EnumSet syncFlags) } synchronized(this) { - if (streamer != null) { + if (!streamer.streamerClosed()) { streamer.setHflush(); } } @@ -2048,7 +641,7 @@ private void flushOrSync(boolean isSync, EnumSet syncFlags) DFSClient.LOG.warn("Error while syncing", e); synchronized (this) { if (!isClosed()) { - lastException.set(new IOException("IOException flush: " + e)); + streamer.getLastException().set(new IOException("IOException flush: " + e)); closeThreads(true); } } @@ -2073,7 +666,7 @@ public synchronized int getNumCurrentReplicas() throws IOException { public synchronized int getCurrentBlockReplication() throws IOException { dfsClient.checkOpen(); checkClosed(); - if (streamer == null) { + if (streamer.streamerClosed()) { return blockReplication; // no pipeline, return repl factor of file } DatanodeInfo[] currentNodes = streamer.getNodes(); @@ -2095,47 +688,12 @@ private void flushInternal() throws IOException { // // If there is data in the current buffer, send it across // - queueCurrentPacket(); - toWaitFor = lastQueuedSeqno; + streamer.queuePacket(currentPacket); + currentPacket = null; + toWaitFor = streamer.getLastQueuedSeqno(); } - waitForAckedSeqno(toWaitFor); - } - - private void waitForAckedSeqno(long seqno) throws IOException { - TraceScope scope = Trace.startSpan("waitForAckedSeqno", Sampler.NEVER); - try { - if (DFSClient.LOG.isDebugEnabled()) { - DFSClient.LOG.debug("Waiting for ack for: " + seqno); - } - long begin = Time.monotonicNow(); - try { - synchronized (dataQueue) { - while (!isClosed()) { - checkClosed(); - if (lastAckedSeqno >= seqno) { - break; - } - try { - dataQueue.wait(1000); // when we receive an ack, we notify on - // dataQueue - } catch (InterruptedException ie) { - throw new InterruptedIOException( - "Interrupted while waiting for data to be acknowledged by pipeline"); - } - } - } - checkClosed(); - } catch (ClosedChannelException e) { - } - long duration = Time.monotonicNow() - begin; - if (duration > dfsclientSlowLogThresholdMs) { - DFSClient.LOG.warn("Slow waitForAckedSeqno took " + duration - + "ms (threshold=" + dfsclientSlowLogThresholdMs + "ms)"); - } - } finally { - scope.close(); - } + streamer.waitForAckedSeqno(toWaitFor); } private synchronized void start() { @@ -2157,22 +715,12 @@ synchronized void abort() throws IOException { } boolean isClosed() { - return closed; + return closed || streamer.streamerClosed(); } void setClosed() { closed = true; - synchronized (dataQueue) { - releaseBuffer(dataQueue, byteArrayManager); - releaseBuffer(ackQueue, byteArrayManager); - } - } - - private static void releaseBuffer(List packets, ByteArrayManager bam) { - for (DFSPacket p : packets) { - p.releaseBuffer(bam); - } - packets.clear(); + streamer.release(); } // shutdown datastreamer and responseprocessor threads. @@ -2181,14 +729,11 @@ private void closeThreads(boolean force) throws IOException { try { streamer.close(force); streamer.join(); - if (s != null) { - s.close(); - } + streamer.closeSocket(); } catch (InterruptedException e) { throw new IOException("Failed to shutdown streamer"); } finally { - streamer = null; - s = null; + streamer.setSocketToNull(); setClosed(); } } @@ -2210,7 +755,7 @@ public synchronized void close() throws IOException { private synchronized void closeImpl() throws IOException { if (isClosed()) { - IOException e = lastException.getAndSet(null); + IOException e = streamer.getLastException().getAndSet(null); if (e == null) return; else @@ -2221,12 +766,14 @@ private synchronized void closeImpl() throws IOException { flushBuffer(); // flush from all upper layers if (currentPacket != null) { - waitAndQueueCurrentPacket(); + streamer.waitAndQueuePacket(currentPacket); + currentPacket = null; } - if (bytesCurBlock != 0) { + if (streamer.getBytesCurBlock() != 0) { // send an empty packet to mark the end of the block - currentPacket = createPacket(0, 0, bytesCurBlock, currentSeqno++, true); + currentPacket = createPacket(0, 0, streamer.getBytesCurBlock(), + streamer.getAndIncCurrentSeqno(), true); currentPacket.setSyncBlock(shouldSyncBlock); } @@ -2261,7 +808,7 @@ private void completeFile(ExtendedBlock last) throws IOException { if (!fileComplete) { final int hdfsTimeout = dfsClient.getHdfsTimeout(); if (!dfsClient.clientRunning - || (hdfsTimeout > 0 + || (hdfsTimeout > 0 && localstart + hdfsTimeout < Time.monotonicNow())) { String msg = "Unable to close file because dfsclient " + " was unable to contact the HDFS servers." + @@ -2290,7 +837,7 @@ private void completeFile(ExtendedBlock last) throws IOException { @VisibleForTesting public void setArtificialSlowdown(long period) { - artificialSlowdown = period; + streamer.setArtificialSlowdown(period); } @VisibleForTesting @@ -2299,10 +846,6 @@ public synchronized void setChunksPerPacket(int value) { packetSize = (bytesPerChecksum + getChecksumSize()) * chunksPerPacket; } - synchronized void setTestFilename(String newname) { - src = newname; - } - /** * Returns the size of a file as it was when this stream was opened */ @@ -2345,9 +888,4 @@ ExtendedBlock getBlock() { public long getFileId() { return fileId; } - - private static void arraycopy(T[] srcs, T[] dsts, int skipIndex) { - System.arraycopy(srcs, 0, dsts, 0, skipIndex); - System.arraycopy(srcs, skipIndex+1, dsts, skipIndex, dsts.length-skipIndex); - } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java new file mode 100644 index 0000000000000..6047825a526ef --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java @@ -0,0 +1,1754 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.SUCCESS; + +import java.io.BufferedOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStream; +import java.io.InterruptedIOException; +import java.io.OutputStream; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.nio.channels.ClosedChannelException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; +import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException; +import org.apache.hadoop.hdfs.protocol.QuotaExceededException; +import org.apache.hadoop.hdfs.protocol.UnresolvedPathException; +import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage; +import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil; +import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol; +import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair; +import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException; +import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader; +import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck; +import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; +import org.apache.hadoop.hdfs.protocolPB.PBHelper; +import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite; +import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; +import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException; +import org.apache.hadoop.hdfs.util.ByteArrayManager; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.ipc.RemoteException; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.AccessControlException; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.util.Daemon; +import org.apache.hadoop.util.DataChecksum; +import org.apache.hadoop.util.Progressable; +import org.apache.hadoop.util.Time; +import org.apache.htrace.NullScope; +import org.apache.htrace.Sampler; +import org.apache.htrace.Span; +import org.apache.htrace.Trace; +import org.apache.htrace.TraceInfo; +import org.apache.htrace.TraceScope; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; +import com.google.common.cache.RemovalListener; +import com.google.common.cache.RemovalNotification; + +/********************************************************************* + * + * The DataStreamer class is responsible for sending data packets to the + * datanodes in the pipeline. It retrieves a new blockid and block locations + * from the namenode, and starts streaming packets to the pipeline of + * Datanodes. Every packet has a sequence number associated with + * it. When all the packets for a block are sent out and acks for each + * if them are received, the DataStreamer closes the current block. + * + * The DataStreamer thread picks up packets from the dataQueue, sends it to + * the first datanode in the pipeline and moves it from the dataQueue to the + * ackQueue. The ResponseProcessor receives acks from the datanodes. When an + * successful ack for a packet is received from all datanodes, the + * ResponseProcessor removes the corresponding packet from the ackQueue. + * + * In case of error, all outstanding packets are moved from ackQueue. A new + * pipeline is setup by eliminating the bad datanode from the original + * pipeline. The DataStreamer now starts sending packets from the dataQueue. + * + *********************************************************************/ + +class DataStreamer extends Daemon { + /** + * Create a socket for a write pipeline + * + * @param first the first datanode + * @param length the pipeline length + * @param client client + * @return the socket connected to the first datanode + */ + static Socket createSocketForPipeline(final DatanodeInfo first, + final int length, final DFSClient client) throws IOException { + final String dnAddr = first.getXferAddr( + client.getConf().connectToDnViaHostname); + if (DFSClient.LOG.isDebugEnabled()) { + DFSClient.LOG.debug("Connecting to datanode " + dnAddr); + } + final InetSocketAddress isa = NetUtils.createSocketAddr(dnAddr); + final Socket sock = client.socketFactory.createSocket(); + final int timeout = client.getDatanodeReadTimeout(length); + NetUtils.connect(sock, isa, client.getRandomLocalInterfaceAddr(), client.getConf().socketTimeout); + sock.setSoTimeout(timeout); + sock.setSendBufferSize(HdfsConstants.DEFAULT_DATA_SOCKET_SIZE); + if(DFSClient.LOG.isDebugEnabled()) { + DFSClient.LOG.debug("Send buf size " + sock.getSendBufferSize()); + } + return sock; + } + + /** + * if this file is lazy persist + * + * @param stat the HdfsFileStatus of a file + * @return if this file is lazy persist + */ + static boolean isLazyPersist(HdfsFileStatus stat) { + final BlockStoragePolicy p = blockStoragePolicySuite.getPolicy( + HdfsConstants.MEMORY_STORAGE_POLICY_NAME); + return p != null && stat.getStoragePolicy() == p.getId(); + } + + /** + * release a list of packets to ByteArrayManager + * + * @param packets packets to be release + * @param bam ByteArrayManager + */ + private static void releaseBuffer(List packets, ByteArrayManager bam) { + for(DFSPacket p : packets) { + p.releaseBuffer(bam); + } + packets.clear(); + } + + private volatile boolean streamerClosed = false; + private ExtendedBlock block; // its length is number of bytes acked + private Token accessToken; + private DataOutputStream blockStream; + private DataInputStream blockReplyStream; + private ResponseProcessor response = null; + private volatile DatanodeInfo[] nodes = null; // list of targets for current block + private volatile StorageType[] storageTypes = null; + private volatile String[] storageIDs = null; + private String[] favoredNodes; + volatile boolean hasError = false; + volatile int errorIndex = -1; + // Restarting node index + AtomicInteger restartingNodeIndex = new AtomicInteger(-1); + private long restartDeadline = 0; // Deadline of DN restart + private BlockConstructionStage stage; // block construction stage + private long bytesSent = 0; // number of bytes that've been sent + private final boolean isLazyPersistFile; + + /** Nodes have been used in the pipeline before and have failed. */ + private final List failed = new ArrayList<>(); + /** The last ack sequence number before pipeline failure. */ + private long lastAckedSeqnoBeforeFailure = -1; + private int pipelineRecoveryCount = 0; + /** Has the current block been hflushed? */ + private boolean isHflushed = false; + /** Append on an existing block? */ + private boolean isAppend; + + private long currentSeqno = 0; + private long lastQueuedSeqno = -1; + private long lastAckedSeqno = -1; + private long bytesCurBlock = 0; // bytes written in current block + private final AtomicReference lastException = new AtomicReference<>(); + private Socket s; + + private final DFSClient dfsClient; + private final String src; + /** Only for DataTransferProtocol.writeBlock(..) */ + private final DataChecksum checksum4WriteBlock; + private final Progressable progress; + private final HdfsFileStatus stat; + // appending to existing partial block + private volatile boolean appendChunk = false; + // both dataQueue and ackQueue are protected by dataQueue lock + private final LinkedList dataQueue = new LinkedList<>(); + private final LinkedList ackQueue = new LinkedList<>(); + private final AtomicReference cachingStrategy; + private final ByteArrayManager byteArrayManager; + private static final BlockStoragePolicySuite blockStoragePolicySuite = + BlockStoragePolicySuite.createDefaultSuite(); + //persist blocks on namenode + private final AtomicBoolean persistBlocks = new AtomicBoolean(false); + private boolean failPacket = false; + private final long dfsclientSlowLogThresholdMs; + private long artificialSlowdown = 0; + + private final LoadingCache excludedNodes; + + private DataStreamer(HdfsFileStatus stat, DFSClient dfsClient, String src, + Progressable progress, DataChecksum checksum, + AtomicReference cachingStrategy, + ByteArrayManager byteArrayManage){ + this.dfsClient = dfsClient; + this.src = src; + this.progress = progress; + this.stat = stat; + this.checksum4WriteBlock = checksum; + this.cachingStrategy = cachingStrategy; + this.byteArrayManager = byteArrayManage; + isLazyPersistFile = isLazyPersist(stat); + this.dfsclientSlowLogThresholdMs = + dfsClient.getConf().dfsclientSlowIoWarningThresholdMs; + excludedNodes = initExcludedNodes(); + } + + /** + * construction with tracing info + */ + DataStreamer(HdfsFileStatus stat, ExtendedBlock block, DFSClient dfsClient, + String src, Progressable progress, DataChecksum checksum, + AtomicReference cachingStrategy, + ByteArrayManager byteArrayManage) { + this(stat, dfsClient, src, progress, checksum, cachingStrategy, + byteArrayManage); + isAppend = false; + this.block = block; + stage = BlockConstructionStage.PIPELINE_SETUP_CREATE; + } + + /** + * Construct a data streamer for appending to the last partial block + * @param lastBlock last block of the file to be appended + * @param stat status of the file to be appended + * @throws IOException if error occurs + */ + DataStreamer(LocatedBlock lastBlock, HdfsFileStatus stat, DFSClient dfsClient, + String src, Progressable progress, DataChecksum checksum, + AtomicReference cachingStrategy, + ByteArrayManager byteArrayManage) throws IOException { + this(stat, dfsClient, src, progress, checksum, cachingStrategy, + byteArrayManage); + isAppend = true; + stage = BlockConstructionStage.PIPELINE_SETUP_APPEND; + block = lastBlock.getBlock(); + bytesSent = block.getNumBytes(); + accessToken = lastBlock.getBlockToken(); + } + + /** + * Set pipeline in construction + * + * @param lastBlock the last block of a file + * @throws IOException + */ + void setPipelineInConstruction(LocatedBlock lastBlock) throws IOException{ + // setup pipeline to append to the last block XXX retries?? + setPipeline(lastBlock); + errorIndex = -1; // no errors yet. + if (nodes.length < 1) { + throw new IOException("Unable to retrieve blocks locations " + + " for last block " + block + + "of file " + src); + } + } + + private void setPipeline(LocatedBlock lb) { + setPipeline(lb.getLocations(), lb.getStorageTypes(), lb.getStorageIDs()); + } + + private void setPipeline(DatanodeInfo[] nodes, StorageType[] storageTypes, + String[] storageIDs) { + this.nodes = nodes; + this.storageTypes = storageTypes; + this.storageIDs = storageIDs; + } + + /** + * Set favored nodes + * + * @param favoredNodes favored nodes + */ + void setFavoredNodes(String[] favoredNodes) { + this.favoredNodes = favoredNodes; + } + + /** + * Initialize for data streaming + */ + private void initDataStreaming() { + this.setName("DataStreamer for file " + src + + " block " + block); + response = new ResponseProcessor(nodes); + response.start(); + stage = BlockConstructionStage.DATA_STREAMING; + } + + private void endBlock() { + if(DFSClient.LOG.isDebugEnabled()) { + DFSClient.LOG.debug("Closing old block " + block); + } + this.setName("DataStreamer for file " + src); + closeResponder(); + closeStream(); + setPipeline(null, null, null); + stage = BlockConstructionStage.PIPELINE_SETUP_CREATE; + } + + /* + * streamer thread is the only thread that opens streams to datanode, + * and closes them. Any error recovery is also done by this thread. + */ + @Override + public void run() { + long lastPacket = Time.monotonicNow(); + TraceScope scope = NullScope.INSTANCE; + while (!streamerClosed && dfsClient.clientRunning) { + // if the Responder encountered an error, shutdown Responder + if (hasError && response != null) { + try { + response.close(); + response.join(); + response = null; + } catch (InterruptedException e) { + DFSClient.LOG.warn("Caught exception ", e); + } + } + + DFSPacket one; + try { + // process datanode IO errors if any + boolean doSleep = false; + if (hasError && (errorIndex >= 0 || restartingNodeIndex.get() >= 0)) { + doSleep = processDatanodeError(); + } + + synchronized (dataQueue) { + // wait for a packet to be sent. + long now = Time.monotonicNow(); + while ((!streamerClosed && !hasError && dfsClient.clientRunning + && dataQueue.size() == 0 && + (stage != BlockConstructionStage.DATA_STREAMING || + stage == BlockConstructionStage.DATA_STREAMING && + now - lastPacket < dfsClient.getConf().socketTimeout/2)) || doSleep ) { + long timeout = dfsClient.getConf().socketTimeout/2 - (now-lastPacket); + timeout = timeout <= 0 ? 1000 : timeout; + timeout = (stage == BlockConstructionStage.DATA_STREAMING)? + timeout : 1000; + try { + dataQueue.wait(timeout); + } catch (InterruptedException e) { + DFSClient.LOG.warn("Caught exception ", e); + } + doSleep = false; + now = Time.monotonicNow(); + } + if (streamerClosed || hasError || !dfsClient.clientRunning) { + continue; + } + // get packet to be sent. + if (dataQueue.isEmpty()) { + one = createHeartbeatPacket(); + assert one != null; + } else { + one = dataQueue.getFirst(); // regular data packet + long parents[] = one.getTraceParents(); + if (parents.length > 0) { + scope = Trace.startSpan("dataStreamer", new TraceInfo(0, parents[0])); + // TODO: use setParents API once it's available from HTrace 3.2 + // scope = Trace.startSpan("dataStreamer", Sampler.ALWAYS); + // scope.getSpan().setParents(parents); + } + } + } + + // get new block from namenode. + if (stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) { + if(DFSClient.LOG.isDebugEnabled()) { + DFSClient.LOG.debug("Allocating new block"); + } + setPipeline(nextBlockOutputStream()); + initDataStreaming(); + } else if (stage == BlockConstructionStage.PIPELINE_SETUP_APPEND) { + if(DFSClient.LOG.isDebugEnabled()) { + DFSClient.LOG.debug("Append to block " + block); + } + setupPipelineForAppendOrRecovery(); + initDataStreaming(); + } + + long lastByteOffsetInBlock = one.getLastByteOffsetBlock(); + if (lastByteOffsetInBlock > stat.getBlockSize()) { + throw new IOException("BlockSize " + stat.getBlockSize() + + " is smaller than data size. " + + " Offset of packet in block " + + lastByteOffsetInBlock + + " Aborting file " + src); + } + + if (one.isLastPacketInBlock()) { + // wait for all data packets have been successfully acked + synchronized (dataQueue) { + while (!streamerClosed && !hasError && + ackQueue.size() != 0 && dfsClient.clientRunning) { + try { + // wait for acks to arrive from datanodes + dataQueue.wait(1000); + } catch (InterruptedException e) { + DFSClient.LOG.warn("Caught exception ", e); + } + } + } + if (streamerClosed || hasError || !dfsClient.clientRunning) { + continue; + } + stage = BlockConstructionStage.PIPELINE_CLOSE; + } + + // send the packet + Span span = null; + synchronized (dataQueue) { + // move packet from dataQueue to ackQueue + if (!one.isHeartbeatPacket()) { + span = scope.detach(); + one.setTraceSpan(span); + dataQueue.removeFirst(); + ackQueue.addLast(one); + dataQueue.notifyAll(); + } + } + + if (DFSClient.LOG.isDebugEnabled()) { + DFSClient.LOG.debug("DataStreamer block " + block + + " sending packet " + one); + } + + // write out data to remote datanode + TraceScope writeScope = Trace.startSpan("writeTo", span); + try { + one.writeTo(blockStream); + blockStream.flush(); + } catch (IOException e) { + // HDFS-3398 treat primary DN is down since client is unable to + // write to primary DN. If a failed or restarting node has already + // been recorded by the responder, the following call will have no + // effect. Pipeline recovery can handle only one node error at a + // time. If the primary node fails again during the recovery, it + // will be taken out then. + tryMarkPrimaryDatanodeFailed(); + throw e; + } finally { + writeScope.close(); + } + lastPacket = Time.monotonicNow(); + + // update bytesSent + long tmpBytesSent = one.getLastByteOffsetBlock(); + if (bytesSent < tmpBytesSent) { + bytesSent = tmpBytesSent; + } + + if (streamerClosed || hasError || !dfsClient.clientRunning) { + continue; + } + + // Is this block full? + if (one.isLastPacketInBlock()) { + // wait for the close packet has been acked + synchronized (dataQueue) { + while (!streamerClosed && !hasError && + ackQueue.size() != 0 && dfsClient.clientRunning) { + dataQueue.wait(1000);// wait for acks to arrive from datanodes + } + } + if (streamerClosed || hasError || !dfsClient.clientRunning) { + continue; + } + + endBlock(); + } + if (progress != null) { progress.progress(); } + + // This is used by unit test to trigger race conditions. + if (artificialSlowdown != 0 && dfsClient.clientRunning) { + Thread.sleep(artificialSlowdown); + } + } catch (Throwable e) { + // Log warning if there was a real error. + if (restartingNodeIndex.get() == -1) { + // Since their messages are descriptive enough, do not always + // log a verbose stack-trace WARN for quota exceptions. + if (e instanceof QuotaExceededException) { + DFSClient.LOG.debug("DataStreamer Quota Exception", e); + } else { + DFSClient.LOG.warn("DataStreamer Exception", e); + } + } + if (e instanceof IOException) { + setLastException((IOException)e); + } else { + setLastException(new IOException("DataStreamer Exception: ",e)); + } + hasError = true; + if (errorIndex == -1 && restartingNodeIndex.get() == -1) { + // Not a datanode issue + streamerClosed = true; + } + } finally { + scope.close(); + } + } + closeInternal(); + } + + private void closeInternal() { + closeResponder(); // close and join + closeStream(); + streamerClosed = true; + release(); + synchronized (dataQueue) { + dataQueue.notifyAll(); + } + } + + /** + * release the DFSPackets in the two queues + * + */ + void release() { + synchronized (dataQueue) { + releaseBuffer(dataQueue, byteArrayManager); + releaseBuffer(ackQueue, byteArrayManager); + } + } + + /** + * wait for the ack of seqno + * + * @param seqno the sequence number to be acked + * @throws IOException + */ + void waitForAckedSeqno(long seqno) throws IOException { + TraceScope scope = Trace.startSpan("waitForAckedSeqno", Sampler.NEVER); + try { + if (DFSClient.LOG.isDebugEnabled()) { + DFSClient.LOG.debug("Waiting for ack for: " + seqno); + } + long begin = Time.monotonicNow(); + try { + synchronized (dataQueue) { + while (!streamerClosed) { + checkClosed(); + if (lastAckedSeqno >= seqno) { + break; + } + try { + dataQueue.wait(1000); // when we receive an ack, we notify on + // dataQueue + } catch (InterruptedException ie) { + throw new InterruptedIOException( + "Interrupted while waiting for data to be acknowledged by pipeline"); + } + } + } + checkClosed(); + } catch (ClosedChannelException e) { + } + long duration = Time.monotonicNow() - begin; + if (duration > dfsclientSlowLogThresholdMs) { + DFSClient.LOG.warn("Slow waitForAckedSeqno took " + duration + + "ms (threshold=" + dfsclientSlowLogThresholdMs + "ms)"); + } + } finally { + scope.close(); + } + } + + /** + * wait for space of dataQueue and queue the packet + * + * @param packet the DFSPacket to be queued + * @throws IOException + */ + void waitAndQueuePacket(DFSPacket packet) throws IOException { + synchronized (dataQueue) { + try { + // If queue is full, then wait till we have enough space + boolean firstWait = true; + try { + while (!streamerClosed && dataQueue.size() + ackQueue.size() > + dfsClient.getConf().writeMaxPackets) { + if (firstWait) { + Span span = Trace.currentSpan(); + if (span != null) { + span.addTimelineAnnotation("dataQueue.wait"); + } + firstWait = false; + } + try { + dataQueue.wait(); + } catch (InterruptedException e) { + // If we get interrupted while waiting to queue data, we still need to get rid + // of the current packet. This is because we have an invariant that if + // currentPacket gets full, it will get queued before the next writeChunk. + // + // Rather than wait around for space in the queue, we should instead try to + // return to the caller as soon as possible, even though we slightly overrun + // the MAX_PACKETS length. + Thread.currentThread().interrupt(); + break; + } + } + } finally { + Span span = Trace.currentSpan(); + if ((span != null) && (!firstWait)) { + span.addTimelineAnnotation("end.wait"); + } + } + checkClosed(); + queuePacket(packet); + } catch (ClosedChannelException e) { + } + } + } + + /* + * close the streamer, should be called only by an external thread + * and only after all data to be sent has been flushed to datanode. + * + * Interrupt this data streamer if force is true + * + * @param force if this data stream is forced to be closed + */ + void close(boolean force) { + streamerClosed = true; + synchronized (dataQueue) { + dataQueue.notifyAll(); + } + if (force) { + this.interrupt(); + } + } + + + private void checkClosed() throws IOException { + if (streamerClosed) { + IOException e = lastException.get(); + throw e != null ? e : new ClosedChannelException(); + } + } + + private void closeResponder() { + if (response != null) { + try { + response.close(); + response.join(); + } catch (InterruptedException e) { + DFSClient.LOG.warn("Caught exception ", e); + } finally { + response = null; + } + } + } + + private void closeStream() { + if (blockStream != null) { + try { + blockStream.close(); + } catch (IOException e) { + setLastException(e); + } finally { + blockStream = null; + } + } + if (blockReplyStream != null) { + try { + blockReplyStream.close(); + } catch (IOException e) { + setLastException(e); + } finally { + blockReplyStream = null; + } + } + if (null != s) { + try { + s.close(); + } catch (IOException e) { + setLastException(e); + } finally { + s = null; + } + } + } + + // The following synchronized methods are used whenever + // errorIndex or restartingNodeIndex is set. This is because + // check & set needs to be atomic. Simply reading variables + // does not require a synchronization. When responder is + // not running (e.g. during pipeline recovery), there is no + // need to use these methods. + + /** Set the error node index. Called by responder */ + synchronized void setErrorIndex(int idx) { + errorIndex = idx; + } + + /** Set the restarting node index. Called by responder */ + synchronized void setRestartingNodeIndex(int idx) { + restartingNodeIndex.set(idx); + // If the data streamer has already set the primary node + // bad, clear it. It is likely that the write failed due to + // the DN shutdown. Even if it was a real failure, the pipeline + // recovery will take care of it. + errorIndex = -1; + } + + /** + * This method is used when no explicit error report was received, + * but something failed. When the primary node is a suspect or + * unsure about the cause, the primary node is marked as failed. + */ + synchronized void tryMarkPrimaryDatanodeFailed() { + // There should be no existing error and no ongoing restart. + if ((errorIndex == -1) && (restartingNodeIndex.get() == -1)) { + errorIndex = 0; + } + } + + /** + * Examine whether it is worth waiting for a node to restart. + * @param index the node index + */ + boolean shouldWaitForRestart(int index) { + // Only one node in the pipeline. + if (nodes.length == 1) { + return true; + } + + // Is it a local node? + InetAddress addr = null; + try { + addr = InetAddress.getByName(nodes[index].getIpAddr()); + } catch (java.net.UnknownHostException e) { + // we are passing an ip address. this should not happen. + assert false; + } + + if (addr != null && NetUtils.isLocalAddress(addr)) { + return true; + } + return false; + } + + // + // Processes responses from the datanodes. A packet is removed + // from the ackQueue when its response arrives. + // + private class ResponseProcessor extends Daemon { + + private volatile boolean responderClosed = false; + private DatanodeInfo[] targets = null; + private boolean isLastPacketInBlock = false; + + ResponseProcessor (DatanodeInfo[] targets) { + this.targets = targets; + } + + @Override + public void run() { + + setName("ResponseProcessor for block " + block); + PipelineAck ack = new PipelineAck(); + + TraceScope scope = NullScope.INSTANCE; + while (!responderClosed && dfsClient.clientRunning && !isLastPacketInBlock) { + // process responses from datanodes. + try { + // read an ack from the pipeline + long begin = Time.monotonicNow(); + ack.readFields(blockReplyStream); + long duration = Time.monotonicNow() - begin; + if (duration > dfsclientSlowLogThresholdMs + && ack.getSeqno() != DFSPacket.HEART_BEAT_SEQNO) { + DFSClient.LOG + .warn("Slow ReadProcessor read fields took " + duration + + "ms (threshold=" + dfsclientSlowLogThresholdMs + "ms); ack: " + + ack + ", targets: " + Arrays.asList(targets)); + } else if (DFSClient.LOG.isDebugEnabled()) { + DFSClient.LOG.debug("DFSClient " + ack); + } + + long seqno = ack.getSeqno(); + // processes response status from datanodes. + for (int i = ack.getNumOfReplies()-1; i >=0 && dfsClient.clientRunning; i--) { + final Status reply = PipelineAck.getStatusFromHeader(ack + .getReply(i)); + // Restart will not be treated differently unless it is + // the local node or the only one in the pipeline. + if (PipelineAck.isRestartOOBStatus(reply) && + shouldWaitForRestart(i)) { + restartDeadline = dfsClient.getConf().datanodeRestartTimeout + + Time.monotonicNow(); + setRestartingNodeIndex(i); + String message = "A datanode is restarting: " + targets[i]; + DFSClient.LOG.info(message); + throw new IOException(message); + } + // node error + if (reply != SUCCESS) { + setErrorIndex(i); // first bad datanode + throw new IOException("Bad response " + reply + + " for block " + block + + " from datanode " + + targets[i]); + } + } + + assert seqno != PipelineAck.UNKOWN_SEQNO : + "Ack for unknown seqno should be a failed ack: " + ack; + if (seqno == DFSPacket.HEART_BEAT_SEQNO) { // a heartbeat ack + continue; + } + + // a success ack for a data packet + DFSPacket one; + synchronized (dataQueue) { + one = ackQueue.getFirst(); + } + if (one.getSeqno() != seqno) { + throw new IOException("ResponseProcessor: Expecting seqno " + + " for block " + block + + one.getSeqno() + " but received " + seqno); + } + isLastPacketInBlock = one.isLastPacketInBlock(); + + // Fail the packet write for testing in order to force a + // pipeline recovery. + if (DFSClientFaultInjector.get().failPacket() && + isLastPacketInBlock) { + failPacket = true; + throw new IOException( + "Failing the last packet for testing."); + } + + // update bytesAcked + block.setNumBytes(one.getLastByteOffsetBlock()); + + synchronized (dataQueue) { + scope = Trace.continueSpan(one.getTraceSpan()); + one.setTraceSpan(null); + lastAckedSeqno = seqno; + ackQueue.removeFirst(); + dataQueue.notifyAll(); + + one.releaseBuffer(byteArrayManager); + } + } catch (Exception e) { + if (!responderClosed) { + if (e instanceof IOException) { + setLastException((IOException)e); + } + hasError = true; + // If no explicit error report was received, mark the primary + // node as failed. + tryMarkPrimaryDatanodeFailed(); + synchronized (dataQueue) { + dataQueue.notifyAll(); + } + if (restartingNodeIndex.get() == -1) { + DFSClient.LOG.warn("DataStreamer ResponseProcessor exception " + + " for block " + block, e); + } + responderClosed = true; + } + } finally { + scope.close(); + } + } + } + + void close() { + responderClosed = true; + this.interrupt(); + } + } + + // If this stream has encountered any errors so far, shutdown + // threads and mark stream as closed. Returns true if we should + // sleep for a while after returning from this call. + // + private boolean processDatanodeError() throws IOException { + if (response != null) { + DFSClient.LOG.info("Error Recovery for " + block + + " waiting for responder to exit. "); + return true; + } + closeStream(); + + // move packets from ack queue to front of the data queue + synchronized (dataQueue) { + dataQueue.addAll(0, ackQueue); + ackQueue.clear(); + } + + // Record the new pipeline failure recovery. + if (lastAckedSeqnoBeforeFailure != lastAckedSeqno) { + lastAckedSeqnoBeforeFailure = lastAckedSeqno; + pipelineRecoveryCount = 1; + } else { + // If we had to recover the pipeline five times in a row for the + // same packet, this client likely has corrupt data or corrupting + // during transmission. + if (++pipelineRecoveryCount > 5) { + DFSClient.LOG.warn("Error recovering pipeline for writing " + + block + ". Already retried 5 times for the same packet."); + lastException.set(new IOException("Failing write. Tried pipeline " + + "recovery 5 times without success.")); + streamerClosed = true; + return false; + } + } + boolean doSleep = setupPipelineForAppendOrRecovery(); + + if (!streamerClosed && dfsClient.clientRunning) { + if (stage == BlockConstructionStage.PIPELINE_CLOSE) { + + // If we had an error while closing the pipeline, we go through a fast-path + // where the BlockReceiver does not run. Instead, the DataNode just finalizes + // the block immediately during the 'connect ack' process. So, we want to pull + // the end-of-block packet from the dataQueue, since we don't actually have + // a true pipeline to send it over. + // + // We also need to set lastAckedSeqno to the end-of-block Packet's seqno, so that + // a client waiting on close() will be aware that the flush finished. + synchronized (dataQueue) { + DFSPacket endOfBlockPacket = dataQueue.remove(); // remove the end of block packet + Span span = endOfBlockPacket.getTraceSpan(); + if (span != null) { + // Close any trace span associated with this Packet + TraceScope scope = Trace.continueSpan(span); + scope.close(); + } + assert endOfBlockPacket.isLastPacketInBlock(); + assert lastAckedSeqno == endOfBlockPacket.getSeqno() - 1; + lastAckedSeqno = endOfBlockPacket.getSeqno(); + dataQueue.notifyAll(); + } + endBlock(); + } else { + initDataStreaming(); + } + } + + return doSleep; + } + + void setHflush() { + isHflushed = true; + } + + private int findNewDatanode(final DatanodeInfo[] original + ) throws IOException { + if (nodes.length != original.length + 1) { + throw new IOException( + new StringBuilder() + .append("Failed to replace a bad datanode on the existing pipeline ") + .append("due to no more good datanodes being available to try. ") + .append("(Nodes: current=").append(Arrays.asList(nodes)) + .append(", original=").append(Arrays.asList(original)).append("). ") + .append("The current failed datanode replacement policy is ") + .append(dfsClient.dtpReplaceDatanodeOnFailure).append(", and ") + .append("a client may configure this via '") + .append(DFSConfigKeys.DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_POLICY_KEY) + .append("' in its configuration.") + .toString()); + } + for(int i = 0; i < nodes.length; i++) { + int j = 0; + for(; j < original.length && !nodes[i].equals(original[j]); j++); + if (j == original.length) { + return i; + } + } + throw new IOException("Failed: new datanode not found: nodes=" + + Arrays.asList(nodes) + ", original=" + Arrays.asList(original)); + } + + private void addDatanode2ExistingPipeline() throws IOException { + if (DataTransferProtocol.LOG.isDebugEnabled()) { + DataTransferProtocol.LOG.debug("lastAckedSeqno = " + lastAckedSeqno); + } + /* + * Is data transfer necessary? We have the following cases. + * + * Case 1: Failure in Pipeline Setup + * - Append + * + Transfer the stored replica, which may be a RBW or a finalized. + * - Create + * + If no data, then no transfer is required. + * + If there are data written, transfer RBW. This case may happens + * when there are streaming failure earlier in this pipeline. + * + * Case 2: Failure in Streaming + * - Append/Create: + * + transfer RBW + * + * Case 3: Failure in Close + * - Append/Create: + * + no transfer, let NameNode replicates the block. + */ + if (!isAppend && lastAckedSeqno < 0 + && stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) { + //no data have been written + return; + } else if (stage == BlockConstructionStage.PIPELINE_CLOSE + || stage == BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) { + //pipeline is closing + return; + } + + //get a new datanode + final DatanodeInfo[] original = nodes; + final LocatedBlock lb = dfsClient.namenode.getAdditionalDatanode( + src, stat.getFileId(), block, nodes, storageIDs, + failed.toArray(new DatanodeInfo[failed.size()]), + 1, dfsClient.clientName); + setPipeline(lb); + + //find the new datanode + final int d = findNewDatanode(original); + + //transfer replica + final DatanodeInfo src = d == 0? nodes[1]: nodes[d - 1]; + final DatanodeInfo[] targets = {nodes[d]}; + final StorageType[] targetStorageTypes = {storageTypes[d]}; + transfer(src, targets, targetStorageTypes, lb.getBlockToken()); + } + + private void transfer(final DatanodeInfo src, final DatanodeInfo[] targets, + final StorageType[] targetStorageTypes, + final Token blockToken) throws IOException { + //transfer replica to the new datanode + Socket sock = null; + DataOutputStream out = null; + DataInputStream in = null; + try { + sock = createSocketForPipeline(src, 2, dfsClient); + final long writeTimeout = dfsClient.getDatanodeWriteTimeout(2); + + OutputStream unbufOut = NetUtils.getOutputStream(sock, writeTimeout); + InputStream unbufIn = NetUtils.getInputStream(sock); + IOStreamPair saslStreams = dfsClient.saslClient.socketSend(sock, + unbufOut, unbufIn, dfsClient, blockToken, src); + unbufOut = saslStreams.out; + unbufIn = saslStreams.in; + out = new DataOutputStream(new BufferedOutputStream(unbufOut, + HdfsConstants.SMALL_BUFFER_SIZE)); + in = new DataInputStream(unbufIn); + + //send the TRANSFER_BLOCK request + new Sender(out).transferBlock(block, blockToken, dfsClient.clientName, + targets, targetStorageTypes); + out.flush(); + + //ack + BlockOpResponseProto response = + BlockOpResponseProto.parseFrom(PBHelper.vintPrefixed(in)); + if (SUCCESS != response.getStatus()) { + throw new IOException("Failed to add a datanode"); + } + } finally { + IOUtils.closeStream(in); + IOUtils.closeStream(out); + IOUtils.closeSocket(sock); + } + } + + /** + * Open a DataStreamer to a DataNode pipeline so that + * it can be written to. + * This happens when a file is appended or data streaming fails + * It keeps on trying until a pipeline is setup + */ + private boolean setupPipelineForAppendOrRecovery() throws IOException { + // check number of datanodes + if (nodes == null || nodes.length == 0) { + String msg = "Could not get block locations. " + "Source file \"" + + src + "\" - Aborting..."; + DFSClient.LOG.warn(msg); + setLastException(new IOException(msg)); + streamerClosed = true; + return false; + } + + boolean success = false; + long newGS = 0L; + while (!success && !streamerClosed && dfsClient.clientRunning) { + // Sleep before reconnect if a dn is restarting. + // This process will be repeated until the deadline or the datanode + // starts back up. + if (restartingNodeIndex.get() >= 0) { + // 4 seconds or the configured deadline period, whichever is shorter. + // This is the retry interval and recovery will be retried in this + // interval until timeout or success. + long delay = Math.min(dfsClient.getConf().datanodeRestartTimeout, + 4000L); + try { + Thread.sleep(delay); + } catch (InterruptedException ie) { + lastException.set(new IOException("Interrupted while waiting for " + + "datanode to restart. " + nodes[restartingNodeIndex.get()])); + streamerClosed = true; + return false; + } + } + boolean isRecovery = hasError; + // remove bad datanode from list of datanodes. + // If errorIndex was not set (i.e. appends), then do not remove + // any datanodes + // + if (errorIndex >= 0) { + StringBuilder pipelineMsg = new StringBuilder(); + for (int j = 0; j < nodes.length; j++) { + pipelineMsg.append(nodes[j]); + if (j < nodes.length - 1) { + pipelineMsg.append(", "); + } + } + if (nodes.length <= 1) { + lastException.set(new IOException("All datanodes " + pipelineMsg + + " are bad. Aborting...")); + streamerClosed = true; + return false; + } + DFSClient.LOG.warn("Error Recovery for block " + block + + " in pipeline " + pipelineMsg + + ": bad datanode " + nodes[errorIndex]); + failed.add(nodes[errorIndex]); + + DatanodeInfo[] newnodes = new DatanodeInfo[nodes.length-1]; + arraycopy(nodes, newnodes, errorIndex); + + final StorageType[] newStorageTypes = new StorageType[newnodes.length]; + arraycopy(storageTypes, newStorageTypes, errorIndex); + + final String[] newStorageIDs = new String[newnodes.length]; + arraycopy(storageIDs, newStorageIDs, errorIndex); + + setPipeline(newnodes, newStorageTypes, newStorageIDs); + + // Just took care of a node error while waiting for a node restart + if (restartingNodeIndex.get() >= 0) { + // If the error came from a node further away than the restarting + // node, the restart must have been complete. + if (errorIndex > restartingNodeIndex.get()) { + restartingNodeIndex.set(-1); + } else if (errorIndex < restartingNodeIndex.get()) { + // the node index has shifted. + restartingNodeIndex.decrementAndGet(); + } else { + // this shouldn't happen... + assert false; + } + } + + if (restartingNodeIndex.get() == -1) { + hasError = false; + } + lastException.set(null); + errorIndex = -1; + } + + // Check if replace-datanode policy is satisfied. + if (dfsClient.dtpReplaceDatanodeOnFailure.satisfy(stat.getReplication(), + nodes, isAppend, isHflushed)) { + try { + addDatanode2ExistingPipeline(); + } catch(IOException ioe) { + if (!dfsClient.dtpReplaceDatanodeOnFailure.isBestEffort()) { + throw ioe; + } + DFSClient.LOG.warn("Failed to replace datanode." + + " Continue with the remaining datanodes since " + + DFSConfigKeys.DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_BEST_EFFORT_KEY + + " is set to true.", ioe); + } + } + + // get a new generation stamp and an access token + LocatedBlock lb = dfsClient.namenode.updateBlockForPipeline(block, dfsClient.clientName); + newGS = lb.getBlock().getGenerationStamp(); + accessToken = lb.getBlockToken(); + + // set up the pipeline again with the remaining nodes + if (failPacket) { // for testing + success = createBlockOutputStream(nodes, storageTypes, newGS, isRecovery); + failPacket = false; + try { + // Give DNs time to send in bad reports. In real situations, + // good reports should follow bad ones, if client committed + // with those nodes. + Thread.sleep(2000); + } catch (InterruptedException ie) {} + } else { + success = createBlockOutputStream(nodes, storageTypes, newGS, isRecovery); + } + + if (restartingNodeIndex.get() >= 0) { + assert hasError == true; + // check errorIndex set above + if (errorIndex == restartingNodeIndex.get()) { + // ignore, if came from the restarting node + errorIndex = -1; + } + // still within the deadline + if (Time.monotonicNow() < restartDeadline) { + continue; // with in the deadline + } + // expired. declare the restarting node dead + restartDeadline = 0; + int expiredNodeIndex = restartingNodeIndex.get(); + restartingNodeIndex.set(-1); + DFSClient.LOG.warn("Datanode did not restart in time: " + + nodes[expiredNodeIndex]); + // Mark the restarting node as failed. If there is any other failed + // node during the last pipeline construction attempt, it will not be + // overwritten/dropped. In this case, the restarting node will get + // excluded in the following attempt, if it still does not come up. + if (errorIndex == -1) { + errorIndex = expiredNodeIndex; + } + // From this point on, normal pipeline recovery applies. + } + } // while + + if (success) { + // update pipeline at the namenode + ExtendedBlock newBlock = new ExtendedBlock( + block.getBlockPoolId(), block.getBlockId(), block.getNumBytes(), newGS); + dfsClient.namenode.updatePipeline(dfsClient.clientName, block, newBlock, + nodes, storageIDs); + // update client side generation stamp + block = newBlock; + } + return false; // do not sleep, continue processing + } + + /** + * Open a DataStreamer to a DataNode so that it can be written to. + * This happens when a file is created and each time a new block is allocated. + * Must get block ID and the IDs of the destinations from the namenode. + * Returns the list of target datanodes. + */ + private LocatedBlock nextBlockOutputStream() throws IOException { + LocatedBlock lb = null; + DatanodeInfo[] nodes = null; + StorageType[] storageTypes = null; + int count = dfsClient.getConf().nBlockWriteRetry; + boolean success = false; + ExtendedBlock oldBlock = block; + do { + hasError = false; + lastException.set(null); + errorIndex = -1; + success = false; + + DatanodeInfo[] excluded = + excludedNodes.getAllPresent(excludedNodes.asMap().keySet()) + .keySet() + .toArray(new DatanodeInfo[0]); + block = oldBlock; + lb = locateFollowingBlock(excluded.length > 0 ? excluded : null); + block = lb.getBlock(); + block.setNumBytes(0); + bytesSent = 0; + accessToken = lb.getBlockToken(); + nodes = lb.getLocations(); + storageTypes = lb.getStorageTypes(); + + // + // Connect to first DataNode in the list. + // + success = createBlockOutputStream(nodes, storageTypes, 0L, false); + + if (!success) { + DFSClient.LOG.info("Abandoning " + block); + dfsClient.namenode.abandonBlock(block, stat.getFileId(), src, + dfsClient.clientName); + block = null; + DFSClient.LOG.info("Excluding datanode " + nodes[errorIndex]); + excludedNodes.put(nodes[errorIndex], nodes[errorIndex]); + } + } while (!success && --count >= 0); + + if (!success) { + throw new IOException("Unable to create new block."); + } + return lb; + } + + // connects to the first datanode in the pipeline + // Returns true if success, otherwise return failure. + // + private boolean createBlockOutputStream(DatanodeInfo[] nodes, + StorageType[] nodeStorageTypes, long newGS, boolean recoveryFlag) { + if (nodes.length == 0) { + DFSClient.LOG.info("nodes are empty for write pipeline of block " + + block); + return false; + } + Status pipelineStatus = SUCCESS; + String firstBadLink = ""; + boolean checkRestart = false; + if (DFSClient.LOG.isDebugEnabled()) { + for (int i = 0; i < nodes.length; i++) { + DFSClient.LOG.debug("pipeline = " + nodes[i]); + } + } + + // persist blocks on namenode on next flush + persistBlocks.set(true); + + int refetchEncryptionKey = 1; + while (true) { + boolean result = false; + DataOutputStream out = null; + try { + assert null == s : "Previous socket unclosed"; + assert null == blockReplyStream : "Previous blockReplyStream unclosed"; + s = createSocketForPipeline(nodes[0], nodes.length, dfsClient); + long writeTimeout = dfsClient.getDatanodeWriteTimeout(nodes.length); + + OutputStream unbufOut = NetUtils.getOutputStream(s, writeTimeout); + InputStream unbufIn = NetUtils.getInputStream(s); + IOStreamPair saslStreams = dfsClient.saslClient.socketSend(s, + unbufOut, unbufIn, dfsClient, accessToken, nodes[0]); + unbufOut = saslStreams.out; + unbufIn = saslStreams.in; + out = new DataOutputStream(new BufferedOutputStream(unbufOut, + HdfsConstants.SMALL_BUFFER_SIZE)); + blockReplyStream = new DataInputStream(unbufIn); + + // + // Xmit header info to datanode + // + + BlockConstructionStage bcs = recoveryFlag? stage.getRecoveryStage(): stage; + + // We cannot change the block length in 'block' as it counts the number + // of bytes ack'ed. + ExtendedBlock blockCopy = new ExtendedBlock(block); + blockCopy.setNumBytes(stat.getBlockSize()); + + boolean[] targetPinnings = getPinnings(nodes, true); + // send the request + new Sender(out).writeBlock(blockCopy, nodeStorageTypes[0], accessToken, + dfsClient.clientName, nodes, nodeStorageTypes, null, bcs, + nodes.length, block.getNumBytes(), bytesSent, newGS, + checksum4WriteBlock, cachingStrategy.get(), isLazyPersistFile, + (targetPinnings == null ? false : targetPinnings[0]), targetPinnings); + + // receive ack for connect + BlockOpResponseProto resp = BlockOpResponseProto.parseFrom( + PBHelper.vintPrefixed(blockReplyStream)); + pipelineStatus = resp.getStatus(); + firstBadLink = resp.getFirstBadLink(); + + // Got an restart OOB ack. + // If a node is already restarting, this status is not likely from + // the same node. If it is from a different node, it is not + // from the local datanode. Thus it is safe to treat this as a + // regular node error. + if (PipelineAck.isRestartOOBStatus(pipelineStatus) && + restartingNodeIndex.get() == -1) { + checkRestart = true; + throw new IOException("A datanode is restarting."); + } + + String logInfo = "ack with firstBadLink as " + firstBadLink; + DataTransferProtoUtil.checkBlockOpStatus(resp, logInfo); + + assert null == blockStream : "Previous blockStream unclosed"; + blockStream = out; + result = true; // success + restartingNodeIndex.set(-1); + hasError = false; + } catch (IOException ie) { + if (restartingNodeIndex.get() == -1) { + DFSClient.LOG.info("Exception in createBlockOutputStream", ie); + } + if (ie instanceof InvalidEncryptionKeyException && refetchEncryptionKey > 0) { + DFSClient.LOG.info("Will fetch a new encryption key and retry, " + + "encryption key was invalid when connecting to " + + nodes[0] + " : " + ie); + // The encryption key used is invalid. + refetchEncryptionKey--; + dfsClient.clearDataEncryptionKey(); + // Don't close the socket/exclude this node just yet. Try again with + // a new encryption key. + continue; + } + + // find the datanode that matches + if (firstBadLink.length() != 0) { + for (int i = 0; i < nodes.length; i++) { + // NB: Unconditionally using the xfer addr w/o hostname + if (firstBadLink.equals(nodes[i].getXferAddr())) { + errorIndex = i; + break; + } + } + } else { + assert checkRestart == false; + errorIndex = 0; + } + // Check whether there is a restart worth waiting for. + if (checkRestart && shouldWaitForRestart(errorIndex)) { + restartDeadline = dfsClient.getConf().datanodeRestartTimeout + + Time.monotonicNow(); + restartingNodeIndex.set(errorIndex); + errorIndex = -1; + DFSClient.LOG.info("Waiting for the datanode to be restarted: " + + nodes[restartingNodeIndex.get()]); + } + hasError = true; + setLastException(ie); + result = false; // error + } finally { + if (!result) { + IOUtils.closeSocket(s); + s = null; + IOUtils.closeStream(out); + out = null; + IOUtils.closeStream(blockReplyStream); + blockReplyStream = null; + } + } + return result; + } + } + + private boolean[] getPinnings(DatanodeInfo[] nodes, boolean shouldLog) { + if (favoredNodes == null) { + return null; + } else { + boolean[] pinnings = new boolean[nodes.length]; + HashSet favoredSet = + new HashSet(Arrays.asList(favoredNodes)); + for (int i = 0; i < nodes.length; i++) { + pinnings[i] = favoredSet.remove(nodes[i].getXferAddrWithHostname()); + if (DFSClient.LOG.isDebugEnabled()) { + DFSClient.LOG.debug(nodes[i].getXferAddrWithHostname() + + " was chosen by name node (favored=" + pinnings[i] + + ")."); + } + } + if (shouldLog && !favoredSet.isEmpty()) { + // There is one or more favored nodes that were not allocated. + DFSClient.LOG.warn( + "These favored nodes were specified but not chosen: " + + favoredSet + + " Specified favored nodes: " + Arrays.toString(favoredNodes)); + + } + return pinnings; + } + } + + private LocatedBlock locateFollowingBlock(DatanodeInfo[] excludedNodes) + throws IOException { + int retries = dfsClient.getConf().nBlockWriteLocateFollowingRetry; + long sleeptime = dfsClient.getConf(). + blockWriteLocateFollowingInitialDelayMs; + while (true) { + long localstart = Time.monotonicNow(); + while (true) { + try { + return dfsClient.namenode.addBlock(src, dfsClient.clientName, + block, excludedNodes, stat.getFileId(), favoredNodes); + } catch (RemoteException e) { + IOException ue = + e.unwrapRemoteException(FileNotFoundException.class, + AccessControlException.class, + NSQuotaExceededException.class, + DSQuotaExceededException.class, + UnresolvedPathException.class); + if (ue != e) { + throw ue; // no need to retry these exceptions + } + + + if (NotReplicatedYetException.class.getName(). + equals(e.getClassName())) { + if (retries == 0) { + throw e; + } else { + --retries; + DFSClient.LOG.info("Exception while adding a block", e); + long elapsed = Time.monotonicNow() - localstart; + if (elapsed > 5000) { + DFSClient.LOG.info("Waiting for replication for " + + (elapsed / 1000) + " seconds"); + } + try { + DFSClient.LOG.warn("NotReplicatedYetException sleeping " + src + + " retries left " + retries); + Thread.sleep(sleeptime); + sleeptime *= 2; + } catch (InterruptedException ie) { + DFSClient.LOG.warn("Caught exception ", ie); + } + } + } else { + throw e; + } + + } + } + } + } + + /** + * get the block this streamer is writing to + * + * @return the block this streamer is writing to + */ + ExtendedBlock getBlock() { + return block; + } + + /** + * return the target datanodes in the pipeline + * + * @return the target datanodes in the pipeline + */ + DatanodeInfo[] getNodes() { + return nodes; + } + + /** + * return the token of the block + * + * @return the token of the block + */ + Token getBlockToken() { + return accessToken; + } + + /** + * set last exception + * + * @param e an exception + */ + void setLastException(IOException e) { + lastException.compareAndSet(null, e); + } + + /** + * Put a packet to the data queue + * + * @param packet the packet to be put into the data queued + */ + void queuePacket(DFSPacket packet) { + synchronized (dataQueue) { + if (packet == null) return; + packet.addTraceParent(Trace.currentSpan()); + dataQueue.addLast(packet); + lastQueuedSeqno = packet.getSeqno(); + if (DFSClient.LOG.isDebugEnabled()) { + DFSClient.LOG.debug("Queued packet " + packet.getSeqno()); + } + dataQueue.notifyAll(); + } + } + + /** + * For heartbeat packets, create buffer directly by new byte[] + * since heartbeats should not be blocked. + */ + private DFSPacket createHeartbeatPacket() throws InterruptedIOException { + final byte[] buf = new byte[PacketHeader.PKT_MAX_HEADER_LEN]; + return new DFSPacket(buf, 0, 0, DFSPacket.HEART_BEAT_SEQNO, 0, false); + } + + private LoadingCache initExcludedNodes() { + return CacheBuilder.newBuilder().expireAfterWrite( + dfsClient.getConf().excludedNodesCacheExpiry, TimeUnit.MILLISECONDS) + .removalListener(new RemovalListener() { + @Override + public void onRemoval( + RemovalNotification notification) { + DFSClient.LOG.info("Removing node " + notification.getKey() + + " from the excluded nodes list"); + } + }).build(new CacheLoader() { + @Override + public DatanodeInfo load(DatanodeInfo key) throws Exception { + return key; + } + }); + } + + private static void arraycopy(T[] srcs, T[] dsts, int skipIndex) { + System.arraycopy(srcs, 0, dsts, 0, skipIndex); + System.arraycopy(srcs, skipIndex+1, dsts, skipIndex, dsts.length-skipIndex); + } + + /** + * check if to persist blocks on namenode + * + * @return if to persist blocks on namenode + */ + AtomicBoolean getPersistBlocks(){ + return persistBlocks; + } + + /** + * check if to append a chunk + * + * @param appendChunk if to append a chunk + */ + void setAppendChunk(boolean appendChunk){ + this.appendChunk = appendChunk; + } + + /** + * get if to append a chunk + * + * @return if to append a chunk + */ + boolean getAppendChunk(){ + return appendChunk; + } + + /** + * get the last exception + * + * @return the last exception + */ + AtomicReference getLastException(){ + return lastException; + } + + /** + * get the socket connecting to the first datanode in pipeline + * + * @return socket connecting to the first datanode in pipeline + */ + Socket getSocket() { + return s; + } + + /** + * set socket to null + */ + void setSocketToNull() { + this.s = null; + } + + /** + * return current sequence number and then increase it by 1 + * + * @return current sequence number before increasing + */ + long getAndIncCurrentSeqno() { + long old = this.currentSeqno; + this.currentSeqno++; + return old; + } + + /** + * get last queued sequence number + * + * @return last queued sequence number + */ + long getLastQueuedSeqno() { + return lastQueuedSeqno; + } + + /** + * get the number of bytes of current block + * + * @return the number of bytes of current block + */ + long getBytesCurBlock() { + return bytesCurBlock; + } + + /** + * set the bytes of current block that have been written + * + * @param bytesCurBlock bytes of current block that have been written + */ + void setBytesCurBlock(long bytesCurBlock) { + this.bytesCurBlock = bytesCurBlock; + } + + /** + * increase bytes of current block by len. + * + * @param len how many bytes to increase to current block + */ + void incBytesCurBlock(long len) { + this.bytesCurBlock += len; + } + + /** + * set artificial slow down for unit test + * + * @param period artificial slow down + */ + void setArtificialSlowdown(long period) { + this.artificialSlowdown = period; + } + + /** + * if this streamer is to terminate + * + * @return if this streamer is to terminate + */ + boolean streamerClosed(){ + return streamerClosed; + } + + void closeSocket() throws IOException { + if (s != null) { + s.close(); + } + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java index 493351b1b8f5e..5fc78d1bf3a7e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java @@ -905,7 +905,7 @@ public static byte[] loadFile(String filename) throws IOException { public static BlockOpResponseProto transferRbw(final ExtendedBlock b, final DFSClient dfsClient, final DatanodeInfo... datanodes) throws IOException { assertEquals(2, datanodes.length); - final Socket s = DFSOutputStream.createSocketForPipeline(datanodes[0], + final Socket s = DataStreamer.createSocketForPipeline(datanodes[0], datanodes.length, dfsClient); final long writeTimeout = dfsClient.getDatanodeWriteTimeout(datanodes.length); final DataOutputStream out = new DataOutputStream(new BufferedOutputStream( diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java index 7269e3910d218..b47e7f1510549 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java @@ -51,8 +51,11 @@ public void testCloseTwice() throws IOException { DFSOutputStream dos = (DFSOutputStream) Whitebox.getInternalState(os, "wrappedStream"); @SuppressWarnings("unchecked") + DataStreamer streamer = (DataStreamer) Whitebox + .getInternalState(dos, "streamer"); + @SuppressWarnings("unchecked") AtomicReference ex = (AtomicReference) Whitebox - .getInternalState(dos, "lastException"); + .getInternalState(streamer, "lastException"); Assert.assertEquals(null, ex.get()); dos.close(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCreation.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCreation.java index e1c547be6ffa4..fd916a942ec77 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCreation.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCreation.java @@ -43,6 +43,8 @@ import java.io.FileNotFoundException; import java.io.FileReader; import java.io.IOException; +import java.lang.reflect.Field; +import java.lang.reflect.Modifier; import java.net.InetSocketAddress; import java.net.URI; import java.net.UnknownHostException; @@ -603,7 +605,8 @@ public void testFileCreationError3() throws IOException { * Test that file leases are persisted across namenode restarts. */ @Test - public void testFileCreationNamenodeRestart() throws IOException { + public void testFileCreationNamenodeRestart() + throws IOException, NoSuchFieldException, IllegalAccessException { Configuration conf = new HdfsConfiguration(); final int MAX_IDLE_TIME = 2000; // 2s conf.setInt("ipc.client.connection.maxidletime", MAX_IDLE_TIME); @@ -702,11 +705,18 @@ public void testFileCreationNamenodeRestart() throws IOException { // new blocks for files that were renamed. DFSOutputStream dfstream = (DFSOutputStream) (stm.getWrappedStream()); - dfstream.setTestFilename(file1.toString()); + + Field f = DFSOutputStream.class.getDeclaredField("src"); + Field modifiersField = Field.class.getDeclaredField("modifiers"); + modifiersField.setAccessible(true); + modifiersField.setInt(f, f.getModifiers() & ~Modifier.FINAL); + f.setAccessible(true); + + f.set(dfstream, file1.toString()); dfstream = (DFSOutputStream) (stm3.getWrappedStream()); - dfstream.setTestFilename(file3new.toString()); + f.set(dfstream, file3new.toString()); dfstream = (DFSOutputStream) (stm4.getWrappedStream()); - dfstream.setTestFilename(file4new.toString()); + f.set(dfstream, file4new.toString()); // write 1 byte to file. This should succeed because the // namenode should have persisted leases.