diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 80d958d07c792..0c66309b20c7b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -376,6 +376,9 @@ Release 2.8.0 - UNRELEASED HDFS-7978. Add LOG.isDebugEnabled() guard for some LOG.debug(..). (Walter Su via wang) + HDFS-7888. Change DFSOutputStream and DataStreamer for convenience of + subclassing. (Li Bo via szetszwo) + OPTIMIZATIONS HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java index c88639da030d5..f6733e36b4afb 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java @@ -20,7 +20,6 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.io.InterruptedIOException; -import java.net.Socket; import java.nio.channels.ClosedChannelException; import java.util.EnumSet; import java.util.concurrent.atomic.AtomicReference; @@ -95,29 +94,29 @@ public class DFSOutputStream extends FSOutputSummer static CryptoProtocolVersion[] SUPPORTED_CRYPTO_VERSIONS = CryptoProtocolVersion.supported(); - private final DFSClient dfsClient; - private final ByteArrayManager byteArrayManager; + protected final DFSClient dfsClient; + protected final ByteArrayManager byteArrayManager; // closed is accessed by different threads under different locks. - private volatile boolean closed = false; - - private final String src; - private final long fileId; - private final long blockSize; - private final int bytesPerChecksum; - - private DFSPacket currentPacket = null; - private DataStreamer streamer; - private int packetSize = 0; // write packet size, not including the header. - private int chunksPerPacket = 0; - private long lastFlushOffset = 0; // offset when flush was invoked + protected volatile boolean closed = false; + + protected final String src; + protected final long fileId; + protected final long blockSize; + protected final int bytesPerChecksum; + + protected DFSPacket currentPacket = null; + protected DataStreamer streamer; + protected int packetSize = 0; // write packet size, not including the header. + protected int chunksPerPacket = 0; + protected long lastFlushOffset = 0; // offset when flush was invoked private long initialFileSize = 0; // at time of file open private final short blockReplication; // replication factor of file - private boolean shouldSyncBlock = false; // force blocks to disk upon close - private final AtomicReference cachingStrategy; + protected boolean shouldSyncBlock = false; // force blocks to disk upon close + protected final AtomicReference cachingStrategy; private FileEncryptionInfo fileEncryptionInfo; /** Use {@link ByteArrayManager} to create buffer for non-heartbeat packets.*/ - private DFSPacket createPacket(int packetSize, int chunksPerPkt, long offsetInBlock, + protected DFSPacket createPacket(int packetSize, int chunksPerPkt, long offsetInBlock, long seqno, boolean lastPacketInBlock) throws InterruptedIOException { final byte[] buf; final int bufferSize = PacketHeader.PKT_MAX_HEADER_LEN + packetSize; @@ -206,7 +205,7 @@ private DFSOutputStream(DFSClient dfsClient, String src, Progressable progress, } /** Construct a new output stream for creating a file. */ - private DFSOutputStream(DFSClient dfsClient, String src, HdfsFileStatus stat, + protected DFSOutputStream(DFSClient dfsClient, String src, HdfsFileStatus stat, EnumSet flag, Progressable progress, DataChecksum checksum, String[] favoredNodes) throws IOException { this(dfsClient, src, progress, stat, checksum); @@ -359,7 +358,7 @@ static DFSOutputStream newStreamForAppend(DFSClient dfsClient, String src, } } - private void computePacketChunkSize(int psize, int csize) { + protected void computePacketChunkSize(int psize, int csize) { final int bodySize = psize - PacketHeader.PKT_MAX_HEADER_LEN; final int chunkSize = csize + getChecksumSize(); chunksPerPacket = Math.max(bodySize/chunkSize, 1); @@ -426,33 +425,46 @@ protected synchronized void writeChunk(byte[] b, int offset, int len, streamer.waitAndQueuePacket(currentPacket); currentPacket = null; - // If the reopened file did not end at chunk boundary and the above - // write filled up its partial chunk. Tell the summer to generate full - // crc chunks from now on. - if (streamer.getAppendChunk() && - streamer.getBytesCurBlock() % bytesPerChecksum == 0) { - streamer.setAppendChunk(false); - resetChecksumBufSize(); - } + adjustChunkBoundary(); - if (!streamer.getAppendChunk()) { - int psize = Math.min((int)(blockSize-streamer.getBytesCurBlock()), - dfsClient.getConf().writePacketSize); - computePacketChunkSize(psize, bytesPerChecksum); - } - // - // if encountering a block boundary, send an empty packet to - // indicate the end of block and reset bytesCurBlock. - // - if (streamer.getBytesCurBlock() == blockSize) { - currentPacket = createPacket(0, 0, streamer.getBytesCurBlock(), - streamer.getAndIncCurrentSeqno(), true); - currentPacket.setSyncBlock(shouldSyncBlock); - streamer.waitAndQueuePacket(currentPacket); - currentPacket = null; - streamer.setBytesCurBlock(0); - lastFlushOffset = 0; - } + endBlock(); + } + } + + /** + * If the reopened file did not end at chunk boundary and the above + * write filled up its partial chunk. Tell the summer to generate full + * crc chunks from now on. + */ + protected void adjustChunkBoundary() { + if (streamer.getAppendChunk() && + streamer.getBytesCurBlock() % bytesPerChecksum == 0) { + streamer.setAppendChunk(false); + resetChecksumBufSize(); + } + + if (!streamer.getAppendChunk()) { + int psize = Math.min((int)(blockSize- streamer.getBytesCurBlock()), + dfsClient.getConf().writePacketSize); + computePacketChunkSize(psize, bytesPerChecksum); + } + } + + /** + * if encountering a block boundary, send an empty packet to + * indicate the end of block and reset bytesCurBlock. + * + * @throws IOException + */ + protected void endBlock() throws IOException { + if (streamer.getBytesCurBlock() == blockSize) { + currentPacket = createPacket(0, 0, streamer.getBytesCurBlock(), + streamer.getAndIncCurrentSeqno(), true); + currentPacket.setSyncBlock(shouldSyncBlock); + streamer.waitAndQueuePacket(currentPacket); + currentPacket = null; + streamer.setBytesCurBlock(0); + lastFlushOffset = 0; } } @@ -676,7 +688,7 @@ public synchronized int getCurrentBlockReplication() throws IOException { * Waits till all existing data is flushed and confirmations * received from datanodes. */ - private void flushInternal() throws IOException { + protected void flushInternal() throws IOException { long toWaitFor; synchronized (this) { dfsClient.checkOpen(); @@ -692,7 +704,7 @@ private void flushInternal() throws IOException { streamer.waitForAckedSeqno(toWaitFor); } - private synchronized void start() { + protected synchronized void start() { streamer.start(); } @@ -721,7 +733,7 @@ void setClosed() { // shutdown datastreamer and responseprocessor threads. // interrupt datastreamer if force is true - private void closeThreads(boolean force) throws IOException { + protected void closeThreads(boolean force) throws IOException { try { streamer.close(force); streamer.join(); @@ -749,7 +761,7 @@ public synchronized void close() throws IOException { } } - private synchronized void closeImpl() throws IOException { + protected synchronized void closeImpl() throws IOException { if (isClosed()) { IOException e = streamer.getLastException().getAndSet(null); if (e == null) @@ -761,7 +773,7 @@ private synchronized void closeImpl() throws IOException { try { flushBuffer(); // flush from all upper layers - if (currentPacket != null) { + if (currentPacket != null) { streamer.waitAndQueuePacket(currentPacket); currentPacket = null; } @@ -792,7 +804,7 @@ private synchronized void closeImpl() throws IOException { // should be called holding (this) lock since setTestFilename() may // be called during unit tests - private void completeFile(ExtendedBlock last) throws IOException { + protected void completeFile(ExtendedBlock last) throws IOException { long localstart = Time.monotonicNow(); long sleeptime = dfsClient.getConf(). blockWriteLocateFollowingInitialDelayMs; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java index 6ff4c2427a621..6bcbfde19fcbd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java @@ -1519,7 +1519,7 @@ private boolean[] getPinnings(DatanodeInfo[] nodes, boolean shouldLog) { } } - private LocatedBlock locateFollowingBlock(DatanodeInfo[] excludedNodes) + protected LocatedBlock locateFollowingBlock(DatanodeInfo[] excludedNodes) throws IOException { int retries = dfsClient.getConf().nBlockWriteLocateFollowingRetry; long sleeptime = dfsClient.getConf(). @@ -1728,15 +1728,6 @@ AtomicReference getLastException(){ return lastException; } - /** - * get the socket connecting to the first datanode in pipeline - * - * @return socket connecting to the first datanode in pipeline - */ - Socket getSocket() { - return s; - } - /** * set socket to null */ @@ -1814,4 +1805,4 @@ void closeSocket() throws IOException { s.close(); } } -} \ No newline at end of file +}