Skip to content
This repository has been archived by the owner on Feb 9, 2021. It is now read-only.

Commit

Permalink
HDFS-642. Support pipeline close and close error recovery. Contribute…
Browse files Browse the repository at this point in the history
…d by Hairong Kuang.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/hdfs/branches/HDFS-265@820062 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information
Hairong Kuang committed Sep 29, 2009
1 parent 66064f8 commit 9b4c9b0
Show file tree
Hide file tree
Showing 13 changed files with 383 additions and 177 deletions.
151 changes: 87 additions & 64 deletions src/java/org/apache/hadoop/hdfs/DFSClient.java
Expand Up @@ -1431,8 +1431,8 @@ protected synchronized int readChunk(long pos, byte[] buf, int offset,
int dataLen = in.readInt();

// Sanity check the lengths
if ( dataLen < 0 ||
( (dataLen % bytesPerChecksum) != 0 && !lastPacketInBlock ) ||
if ( ( dataLen <= 0 && !lastPacketInBlock ) ||
( dataLen != 0 && lastPacketInBlock) ||
(seqno != (lastSeqNo + 1)) ) {
throw new IOException("BlockReader: error in packet header" +
"(chunkOffset : " + chunkOffset +
Expand Down Expand Up @@ -2598,7 +2598,16 @@ private void initDataStreaming() {
response.start();
stage = BlockConstructionStage.DATA_STREAMING;
}


private void endBlock() {
LOG.debug("Closing old block " + block);
this.setName("DataStreamer for file " + src);
closeResponder();
closeStream();
nodes = null;
stage = BlockConstructionStage.PIPELINE_SETUP_CREATE;
}

/*
* streamer thread is the only thread that opens streams to datanode,
* and closes them. Any error recovery is also done by this thread.
Expand Down Expand Up @@ -2642,8 +2651,6 @@ public void run() {
one = dataQueue.getFirst();
}

long offsetInBlock = one.offsetInBlock;

// get new block from namenode.
if (stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) {
LOG.debug("Allocating new block");
Expand All @@ -2655,14 +2662,34 @@ public void run() {
initDataStreaming();
}

if (offsetInBlock >= blockSize) {
long lastByteOffsetInBlock = one.getLastByteOffsetBlock();
if (lastByteOffsetInBlock > blockSize) {
throw new IOException("BlockSize " + blockSize +
" is smaller than data size. " +
" Offset of packet in block " +
offsetInBlock +
lastByteOffsetInBlock +
" Aborting file " + src);
}

if (one.lastPacketInBlock) {
// wait for all data packets have been successfully acked
synchronized (dataQueue) {
while (!streamerClosed && !hasError &&
ackQueue.size() != 0 && clientRunning) {
try {
// wait for acks to arrive from datanodes
dataQueue.wait(1000);
} catch (InterruptedException e) {
}
}
}
if (streamerClosed || hasError || !clientRunning) {
continue;
}
stage = BlockConstructionStage.PIPELINE_CLOSE;
}

// send the packet
ByteBuffer buf = one.getBuffer();

synchronized (dataQueue) {
Expand All @@ -2674,11 +2701,7 @@ public void run() {

if (LOG.isDebugEnabled()) {
LOG.debug("DataStreamer block " + block +
" sending packet seqno:" + one.seqno +
" size:" + buf.remaining() +
" offsetInBlock:" + one.offsetInBlock +
" lastPacketInBlock:" + one.lastPacketInBlock +
" lastByteOffsetInBlock" + one.getLastByteOffsetBlock());
" sending packet " + one);
}

// write out data to remote datanode
Expand All @@ -2690,22 +2713,31 @@ public void run() {
if (bytesSent < tmpBytesSent) {
bytesSent = tmpBytesSent;
}


if (streamerClosed || hasError || !clientRunning) {
continue;
}

// Is this block full?
if (one.lastPacketInBlock) {
// wait for the close packet has been acked
synchronized (dataQueue) {
while (!streamerClosed && !hasError && ackQueue.size() != 0 && clientRunning) {
try {
dataQueue.wait(1000); // wait for acks to arrive from datanodes
} catch (InterruptedException e) {
}
while (!streamerClosed && !hasError &&
ackQueue.size() != 0 && clientRunning) {
dataQueue.wait(1000);// wait for acks to arrive from datanodes
}
}

if (ackQueue.isEmpty()) { // done receiving all acks
// indicate end-of-block
blockStream.writeInt(0);
blockStream.flush();
if (streamerClosed || hasError || !clientRunning) {
continue;
}

endBlock();
}
if (progress != null) { progress.progress(); }

// This is used by unit test to trigger race conditions.
if (artificialSlowdown != 0 && clientRunning) {
Thread.sleep(artificialSlowdown);
}
} catch (Throwable e) {
LOG.warn("DataStreamer Exception: " +
Expand All @@ -2718,29 +2750,6 @@ public void run() {
streamerClosed = true;
}
}


if (streamerClosed || hasError || !clientRunning) {
continue;
}

// Is this block full?
if (one.lastPacketInBlock) {
LOG.debug("Closing old block " + block);
this.setName("DataStreamer for file " + src);
closeResponder();
closeStream();
nodes = null;
stage = BlockConstructionStage.PIPELINE_SETUP_CREATE;
}
if (progress != null) { progress.progress(); }

// This is used by unit test to trigger race conditions.
if (artificialSlowdown != 0 && clientRunning) {
try {
Thread.sleep(artificialSlowdown);
} catch (InterruptedException e) {}
}
}
closeInternal();
}
Expand Down Expand Up @@ -2928,7 +2937,15 @@ private boolean processDatanodeError() throws IOException {
boolean doSleep = setupPipelineForAppendOrRecovery();

if (!streamerClosed && clientRunning) {
initDataStreaming();
if (stage == BlockConstructionStage.PIPELINE_CLOSE) {
synchronized (dataQueue) {
dataQueue.remove(); // remove the end of block packet
dataQueue.notifyAll();
}
endBlock();
} else {
initDataStreaming();
}
}

return doSleep;
Expand Down Expand Up @@ -3392,15 +3409,6 @@ protected synchronized void writeChunk(byte[] b, int offset, int len, byte[] che
", blockSize=" + blockSize +
", appendChunk=" + appendChunk);
}
//
// if we allocated a new packet because we encountered a block
// boundary, reset bytesCurBlock.
//
if (bytesCurBlock == blockSize) {
currentPacket.lastPacketInBlock = true;
bytesCurBlock = 0;
lastFlushOffset = -1;
}
waitAndQueuePacket(currentPacket);
currentPacket = null;

Expand All @@ -3413,6 +3421,20 @@ protected synchronized void writeChunk(byte[] b, int offset, int len, byte[] che
}
int psize = Math.min((int)(blockSize-bytesCurBlock), writePacketSize);
computePacketChunkSize(psize, bytesPerChecksum);

//
// if encountering a block boundary, send an empty packet to
// indicate the end of block and reset bytesCurBlock.
//
if (bytesCurBlock == blockSize) {
currentPacket = new Packet(DataNode.PKT_HEADER_LEN+4, 0,
bytesCurBlock);
currentPacket.lastPacketInBlock = true;
waitAndQueuePacket(currentPacket);
currentPacket = null;
bytesCurBlock = 0;
lastFlushOffset = -1;
}
}
}

Expand Down Expand Up @@ -3556,21 +3578,22 @@ public synchronized void close() throws IOException {
try {
flushBuffer(); // flush from all upper layers

// Mark that this packet is the last packet in block.
// If there are no outstanding packets and the last packet
// was not the last one in the current block, then create a
// packet with empty payload.
if (currentPacket == null && bytesCurBlock != 0) {
currentPacket = new Packet(packetSize, chunksPerPacket,
bytesCurBlock);
}
if (currentPacket != null) {
waitAndQueuePacket(currentPacket);
}

if (bytesCurBlock != 0) {
// send an empty packet to mark the end of the block
currentPacket = new Packet(DataNode.PKT_HEADER_LEN+4, 0,
bytesCurBlock);
currentPacket.lastPacketInBlock = true;
}

flushInternal(); // flush all data to Datanodes
LOG.info("Done flushing");
// get last block before destroying the streamer
Block lastBlock = streamer.getBlock();
LOG.info("Closing the streams...");
closeThreads(false);
completeFile(lastBlock);
leasechecker.remove(src);
Expand Down

0 comments on commit 9b4c9b0

Please sign in to comment.