Skip to content

Commit

Permalink
HDFS-7888. Change DFSOutputStream and DataStreamer for convenience of…
Browse files Browse the repository at this point in the history
… subclassing. Contributed by Li Bo
  • Loading branch information
Tsz-Wo Nicholas Sze committed Apr 2, 2015
1 parent 867d5d2 commit 9ed43f2
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 63 deletions.
3 changes: 3 additions & 0 deletions hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
Expand Up @@ -376,6 +376,9 @@ Release 2.8.0 - UNRELEASED
HDFS-7978. Add LOG.isDebugEnabled() guard for some LOG.debug(..).
(Walter Su via wang)

HDFS-7888. Change DFSOutputStream and DataStreamer for convenience of
subclassing. (Li Bo via szetszwo)

OPTIMIZATIONS

HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than
Expand Down
Expand Up @@ -20,7 +20,6 @@
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.net.Socket;
import java.nio.channels.ClosedChannelException;
import java.util.EnumSet;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -95,29 +94,29 @@ public class DFSOutputStream extends FSOutputSummer
static CryptoProtocolVersion[] SUPPORTED_CRYPTO_VERSIONS =
CryptoProtocolVersion.supported();

private final DFSClient dfsClient;
private final ByteArrayManager byteArrayManager;
protected final DFSClient dfsClient;
protected final ByteArrayManager byteArrayManager;
// closed is accessed by different threads under different locks.
private volatile boolean closed = false;

private final String src;
private final long fileId;
private final long blockSize;
private final int bytesPerChecksum;

private DFSPacket currentPacket = null;
private DataStreamer streamer;
private int packetSize = 0; // write packet size, not including the header.
private int chunksPerPacket = 0;
private long lastFlushOffset = 0; // offset when flush was invoked
protected volatile boolean closed = false;

protected final String src;
protected final long fileId;
protected final long blockSize;
protected final int bytesPerChecksum;

protected DFSPacket currentPacket = null;
protected DataStreamer streamer;
protected int packetSize = 0; // write packet size, not including the header.
protected int chunksPerPacket = 0;
protected long lastFlushOffset = 0; // offset when flush was invoked
private long initialFileSize = 0; // at time of file open
private final short blockReplication; // replication factor of file
private boolean shouldSyncBlock = false; // force blocks to disk upon close
private final AtomicReference<CachingStrategy> cachingStrategy;
protected boolean shouldSyncBlock = false; // force blocks to disk upon close
protected final AtomicReference<CachingStrategy> cachingStrategy;
private FileEncryptionInfo fileEncryptionInfo;

/** Use {@link ByteArrayManager} to create buffer for non-heartbeat packets.*/
private DFSPacket createPacket(int packetSize, int chunksPerPkt, long offsetInBlock,
protected DFSPacket createPacket(int packetSize, int chunksPerPkt, long offsetInBlock,
long seqno, boolean lastPacketInBlock) throws InterruptedIOException {
final byte[] buf;
final int bufferSize = PacketHeader.PKT_MAX_HEADER_LEN + packetSize;
Expand Down Expand Up @@ -206,7 +205,7 @@ private DFSOutputStream(DFSClient dfsClient, String src, Progressable progress,
}

/** Construct a new output stream for creating a file. */
private DFSOutputStream(DFSClient dfsClient, String src, HdfsFileStatus stat,
protected DFSOutputStream(DFSClient dfsClient, String src, HdfsFileStatus stat,
EnumSet<CreateFlag> flag, Progressable progress,
DataChecksum checksum, String[] favoredNodes) throws IOException {
this(dfsClient, src, progress, stat, checksum);
Expand Down Expand Up @@ -359,7 +358,7 @@ static DFSOutputStream newStreamForAppend(DFSClient dfsClient, String src,
}
}

private void computePacketChunkSize(int psize, int csize) {
protected void computePacketChunkSize(int psize, int csize) {
final int bodySize = psize - PacketHeader.PKT_MAX_HEADER_LEN;
final int chunkSize = csize + getChecksumSize();
chunksPerPacket = Math.max(bodySize/chunkSize, 1);
Expand Down Expand Up @@ -426,33 +425,46 @@ protected synchronized void writeChunk(byte[] b, int offset, int len,
streamer.waitAndQueuePacket(currentPacket);
currentPacket = null;

// If the reopened file did not end at chunk boundary and the above
// write filled up its partial chunk. Tell the summer to generate full
// crc chunks from now on.
if (streamer.getAppendChunk() &&
streamer.getBytesCurBlock() % bytesPerChecksum == 0) {
streamer.setAppendChunk(false);
resetChecksumBufSize();
}
adjustChunkBoundary();

if (!streamer.getAppendChunk()) {
int psize = Math.min((int)(blockSize-streamer.getBytesCurBlock()),
dfsClient.getConf().writePacketSize);
computePacketChunkSize(psize, bytesPerChecksum);
}
//
// if encountering a block boundary, send an empty packet to
// indicate the end of block and reset bytesCurBlock.
//
if (streamer.getBytesCurBlock() == blockSize) {
currentPacket = createPacket(0, 0, streamer.getBytesCurBlock(),
streamer.getAndIncCurrentSeqno(), true);
currentPacket.setSyncBlock(shouldSyncBlock);
streamer.waitAndQueuePacket(currentPacket);
currentPacket = null;
streamer.setBytesCurBlock(0);
lastFlushOffset = 0;
}
endBlock();
}
}

/**
* If the reopened file did not end at chunk boundary and the above
* write filled up its partial chunk. Tell the summer to generate full
* crc chunks from now on.
*/
protected void adjustChunkBoundary() {
if (streamer.getAppendChunk() &&
streamer.getBytesCurBlock() % bytesPerChecksum == 0) {
streamer.setAppendChunk(false);
resetChecksumBufSize();
}

if (!streamer.getAppendChunk()) {
int psize = Math.min((int)(blockSize- streamer.getBytesCurBlock()),
dfsClient.getConf().writePacketSize);
computePacketChunkSize(psize, bytesPerChecksum);
}
}

/**
* if encountering a block boundary, send an empty packet to
* indicate the end of block and reset bytesCurBlock.
*
* @throws IOException
*/
protected void endBlock() throws IOException {
if (streamer.getBytesCurBlock() == blockSize) {
currentPacket = createPacket(0, 0, streamer.getBytesCurBlock(),
streamer.getAndIncCurrentSeqno(), true);
currentPacket.setSyncBlock(shouldSyncBlock);
streamer.waitAndQueuePacket(currentPacket);
currentPacket = null;
streamer.setBytesCurBlock(0);
lastFlushOffset = 0;
}
}

Expand Down Expand Up @@ -676,7 +688,7 @@ public synchronized int getCurrentBlockReplication() throws IOException {
* Waits till all existing data is flushed and confirmations
* received from datanodes.
*/
private void flushInternal() throws IOException {
protected void flushInternal() throws IOException {
long toWaitFor;
synchronized (this) {
dfsClient.checkOpen();
Expand All @@ -692,7 +704,7 @@ private void flushInternal() throws IOException {
streamer.waitForAckedSeqno(toWaitFor);
}

private synchronized void start() {
protected synchronized void start() {
streamer.start();
}

Expand Down Expand Up @@ -721,7 +733,7 @@ void setClosed() {

// shutdown datastreamer and responseprocessor threads.
// interrupt datastreamer if force is true
private void closeThreads(boolean force) throws IOException {
protected void closeThreads(boolean force) throws IOException {
try {
streamer.close(force);
streamer.join();
Expand Down Expand Up @@ -749,7 +761,7 @@ public synchronized void close() throws IOException {
}
}

private synchronized void closeImpl() throws IOException {
protected synchronized void closeImpl() throws IOException {
if (isClosed()) {
IOException e = streamer.getLastException().getAndSet(null);
if (e == null)
Expand All @@ -761,7 +773,7 @@ private synchronized void closeImpl() throws IOException {
try {
flushBuffer(); // flush from all upper layers

if (currentPacket != null) {
if (currentPacket != null) {
streamer.waitAndQueuePacket(currentPacket);
currentPacket = null;
}
Expand Down Expand Up @@ -792,7 +804,7 @@ private synchronized void closeImpl() throws IOException {

// should be called holding (this) lock since setTestFilename() may
// be called during unit tests
private void completeFile(ExtendedBlock last) throws IOException {
protected void completeFile(ExtendedBlock last) throws IOException {
long localstart = Time.monotonicNow();
long sleeptime = dfsClient.getConf().
blockWriteLocateFollowingInitialDelayMs;
Expand Down
Expand Up @@ -1519,7 +1519,7 @@ private boolean[] getPinnings(DatanodeInfo[] nodes, boolean shouldLog) {
}
}

private LocatedBlock locateFollowingBlock(DatanodeInfo[] excludedNodes)
protected LocatedBlock locateFollowingBlock(DatanodeInfo[] excludedNodes)
throws IOException {
int retries = dfsClient.getConf().nBlockWriteLocateFollowingRetry;
long sleeptime = dfsClient.getConf().
Expand Down Expand Up @@ -1728,15 +1728,6 @@ AtomicReference<IOException> getLastException(){
return lastException;
}

/**
* get the socket connecting to the first datanode in pipeline
*
* @return socket connecting to the first datanode in pipeline
*/
Socket getSocket() {
return s;
}

/**
* set socket to null
*/
Expand Down Expand Up @@ -1814,4 +1805,4 @@ void closeSocket() throws IOException {
s.close();
}
}
}
}

0 comments on commit 9ed43f2

Please sign in to comment.