Skip to content

Commit

Permalink
HDFS-8254. Avoid assigning a leading streamer in StripedDataStreamer …
Browse files Browse the repository at this point in the history
…to tolerate datanode failure. Contributed by Tsz Wo Nicholas Sze.
  • Loading branch information
Jing9 committed Jun 19, 2015
1 parent 05c6968 commit 3682e01
Show file tree
Hide file tree
Showing 8 changed files with 307 additions and 209 deletions.
Expand Up @@ -189,16 +189,6 @@ interface StripedRead {
int THREADPOOL_SIZE_DEFAULT = 18;
}

/** dfs.client.write.striped configuration properties */
interface StripedWrite {
String PREFIX = Write.PREFIX + "striped.";

String MAX_SECONDS_GET_STRIPED_BLOCK_KEY = PREFIX + "max-seconds-get-striped-block";
int MAX_SECONDS_GET_STRIPED_BLOCK_DEFAULT = 90;
String MAX_SECONDS_GET_ENDED_BLOCK_KEY = PREFIX + "max-seconds-get-ended-block";
int MAX_SECONDS_GET_ENDED_BLOCK_DEFAULT = 60;
}

/** dfs.http.client configuration properties */
interface HttpClient {
String PREFIX = "dfs.http.client.";
Expand Down
5 changes: 4 additions & 1 deletion hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
Expand Up @@ -307,4 +307,7 @@
StripedBlocksFeature. (Walter Su via jing9)

HDFS-8466. Refactor BlockInfoContiguous and fix NPE in
TestBlockInfo#testCopyConstructor() (vinayakumarb)
TestBlockInfo#testCopyConstructor() (vinayakumarb)

HDFS-8254. Avoid assigning a leading streamer in StripedDataStreamer to
tolerate datanode failure. (Tsz Wo Nicholas Sze via jing9)
Expand Up @@ -28,7 +28,6 @@
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience;
Expand All @@ -40,7 +39,6 @@
import org.apache.hadoop.io.MultipleIOException;
import org.apache.hadoop.io.erasurecode.CodecUtil;
import org.apache.hadoop.io.erasurecode.ECSchema;
import org.apache.hadoop.io.erasurecode.rawcoder.RSRawEncoder;
import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.Progressable;
Expand All @@ -51,27 +49,33 @@
import com.google.common.base.Preconditions;


/****************************************************************
* The DFSStripedOutputStream class supports writing files in striped
* layout. Each stripe contains a sequence of cells and multiple
* {@link StripedDataStreamer}s in DFSStripedOutputStream are responsible
* for writing the cells to different datanodes.
*
****************************************************************/

/**
* This class supports writing files in striped layout and erasure coded format.
* Each stripe contains a sequence of cells.
*/
@InterfaceAudience.Private
public class DFSStripedOutputStream extends DFSOutputStream {
static class MultipleBlockingQueue<T> {
private final int pullTimeout;
private final List<BlockingQueue<T>> queues;

MultipleBlockingQueue(int numQueue, int queueSize, int pullTimeout) {
MultipleBlockingQueue(int numQueue, int queueSize) {
queues = new ArrayList<>(numQueue);
for (int i = 0; i < numQueue; i++) {
queues.add(new LinkedBlockingQueue<T>(queueSize));
}
}

boolean isEmpty() {
for(int i = 0; i < queues.size(); i++) {
if (!queues.get(i).isEmpty()) {
return false;
}
}
return true;
}

this.pullTimeout = pullTimeout;
int numQueues() {
return queues.size();
}

void offer(int i, T object) {
Expand All @@ -80,74 +84,107 @@ void offer(int i, T object) {
+ " to queue, i=" + i);
}

T poll(int i) throws InterruptedIOException {
T take(int i) throws InterruptedIOException {
try {
return queues.get(i).poll(pullTimeout, TimeUnit.SECONDS);
} catch (InterruptedException e) {
throw DFSUtil.toInterruptedIOException("poll interrupted, i=" + i, e);
return queues.get(i).take();
} catch(InterruptedException ie) {
throw DFSUtil.toInterruptedIOException("take interrupted, i=" + i, ie);
}
}

T poll(int i) {
return queues.get(i).poll();
}

T peek(int i) {
return queues.get(i).peek();
}
}

/** Coordinate the communication between the streamers. */
static class Coordinator {
private final MultipleBlockingQueue<LocatedBlock> stripedBlocks;
class Coordinator {
private final MultipleBlockingQueue<LocatedBlock> followingBlocks;
private final MultipleBlockingQueue<ExtendedBlock> endBlocks;

private final MultipleBlockingQueue<LocatedBlock> newBlocks;
private final MultipleBlockingQueue<ExtendedBlock> updateBlocks;

Coordinator(final DfsClientConf conf, final int numDataBlocks,
final int numAllBlocks) {
stripedBlocks = new MultipleBlockingQueue<>(numAllBlocks, 1,
conf.getStripedWriteMaxSecondsGetStripedBlock());
endBlocks = new MultipleBlockingQueue<>(numDataBlocks, 1,
conf.getStripedWriteMaxSecondsGetEndedBlock());
updateBlocks = new MultipleBlockingQueue<>(numAllBlocks, 1,
conf.getStripedWriteMaxSecondsGetStripedBlock());
followingBlocks = new MultipleBlockingQueue<>(numAllBlocks, 1);
endBlocks = new MultipleBlockingQueue<>(numDataBlocks, 1);

newBlocks = new MultipleBlockingQueue<>(numAllBlocks, 1);
updateBlocks = new MultipleBlockingQueue<>(numAllBlocks, 1);
}

void putEndBlock(int i, ExtendedBlock block) {
endBlocks.offer(i, block);
MultipleBlockingQueue<LocatedBlock> getFollowingBlocks() {
return followingBlocks;
}

MultipleBlockingQueue<LocatedBlock> getNewBlocks() {
return newBlocks;
}

ExtendedBlock getEndBlock(int i) throws InterruptedIOException {
return endBlocks.poll(i);
MultipleBlockingQueue<ExtendedBlock> getUpdateBlocks() {
return updateBlocks;
}

void putUpdateBlock(int i, ExtendedBlock block) {
updateBlocks.offer(i, block);
StripedDataStreamer getStripedDataStreamer(int i) {
return DFSStripedOutputStream.this.getStripedDataStreamer(i);
}

ExtendedBlock getUpdateBlock(int i) throws InterruptedIOException {
return updateBlocks.poll(i);
void offerEndBlock(int i, ExtendedBlock block) {
endBlocks.offer(i, block);
}

ExtendedBlock takeEndBlock(int i) throws InterruptedIOException {
return endBlocks.take(i);
}

boolean hasAllEndBlocks() {
for(int i = 0; i < endBlocks.numQueues(); i++) {
if (endBlocks.peek(i) == null) {
return false;
}
}
return true;
}

void setBytesEndBlock(int i, long newBytes, ExtendedBlock block) {
ExtendedBlock b = endBlocks.peek(i);
if (b == null) {
// streamer just has failed, put end block and continue
b = block;
putEndBlock(i, b);
offerEndBlock(i, b);
}
b.setNumBytes(newBytes);
}

void putStripedBlock(int i, LocatedBlock block) throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("putStripedBlock " + block + ", i=" + i);
/** @return a block representing the entire block group. */
ExtendedBlock getBlockGroup() {
final StripedDataStreamer s0 = getStripedDataStreamer(0);
final ExtendedBlock b0 = s0.getBlock();
if (b0 == null) {
return null;
}
stripedBlocks.offer(i, block);
}

LocatedBlock getStripedBlock(int i) throws IOException {
final LocatedBlock lb = stripedBlocks.poll(i);
if (lb == null) {
throw new IOException("Failed: i=" + i);
final boolean atBlockGroupBoundary = s0.getBytesCurBlock() == 0 && b0.getNumBytes() > 0;
final ExtendedBlock block = new ExtendedBlock(b0);
long numBytes = b0.getNumBytes();
for (int i = 1; i < numDataBlocks; i++) {
final StripedDataStreamer si = getStripedDataStreamer(i);
final ExtendedBlock bi = si.getBlock();
if (bi != null && bi.getGenerationStamp() > block.getGenerationStamp()) {
block.setGenerationStamp(bi.getGenerationStamp());
}
numBytes += atBlockGroupBoundary? bi.getNumBytes(): si.getBytesCurBlock();
}
return lb;
block.setNumBytes(numBytes);
if (LOG.isDebugEnabled()) {
LOG.debug("getBlockGroup: " + block + ", numBytes=" + block.getNumBytes());
}
return block;
}
}

Expand Down Expand Up @@ -223,13 +260,9 @@ private void flipDataBuffers() {
private final int numAllBlocks;
private final int numDataBlocks;

private StripedDataStreamer getLeadingStreamer() {
return streamers.get(0);
}

@Override
ExtendedBlock getBlock() {
return getLeadingStreamer().getBlock();
return coordinator.getBlockGroup();
}

/** Construct a new output stream for creating a file. */
Expand Down Expand Up @@ -308,7 +341,9 @@ private void checkStreamers() throws IOException {
int count = 0;
for(StripedDataStreamer s : streamers) {
if (!s.isFailed()) {
s.getErrorState().initExtenalError();
if (s.getBlock() != null) {
s.getErrorState().initExternalError();
}
count++;
}
}
Expand All @@ -325,7 +360,7 @@ private void checkStreamers() throws IOException {
private void handleStreamerFailure(String err,
Exception e) throws IOException {
LOG.warn("Failed: " + err + ", " + this, e);
getCurrentStreamer().setIsFailed(true);
getCurrentStreamer().setFailed(true);
checkStreamers();
currentPacket = null;
}
Expand Down Expand Up @@ -443,10 +478,17 @@ synchronized void abort() throws IOException {
dfsClient.endFileLease(fileId);
}

//TODO: Handle slow writers (HDFS-7786)
//Cuurently only check if the leading streamer is terminated
@Override
boolean isClosed() {
return closed || getLeadingStreamer().streamerClosed();
if (closed) {
return true;
}
for(StripedDataStreamer s : streamers) {
if (!s.streamerClosed()) {
return false;
}
}
return true;
}

@Override
Expand Down Expand Up @@ -560,7 +602,19 @@ void setClosed() {
@Override
protected synchronized void closeImpl() throws IOException {
if (isClosed()) {
getLeadingStreamer().getLastException().check(true);
final MultipleIOException.Builder b = new MultipleIOException.Builder();
for(int i = 0; i < streamers.size(); i++) {
final StripedDataStreamer si = getStripedDataStreamer(i);
try {
si.getLastException().check(true);
} catch (IOException e) {
b.add(e);
}
}
final IOException ioe = b.build();
if (ioe != null) {
throw ioe;
}
return;
}

Expand Down Expand Up @@ -594,7 +648,7 @@ protected synchronized void closeImpl() throws IOException {
}

closeThreads(false);
final ExtendedBlock lastBlock = getCommittedBlock();
final ExtendedBlock lastBlock = coordinator.getBlockGroup();
TraceScope scope = Trace.startSpan("completeFile", Sampler.NEVER);
try {
completeFile(lastBlock);
Expand All @@ -607,30 +661,4 @@ protected synchronized void closeImpl() throws IOException {
setClosed();
}
}

/**
* Generate the block which is reported and will be committed in NameNode.
* Need to go through all the streamers writing data blocks and add their
* bytesCurBlock together. Note that at this time all streamers have been
* closed. Also this calculation can cover streamers with writing failures.
*
* @return An ExtendedBlock with size of the whole block group.
*/
ExtendedBlock getCommittedBlock() throws IOException {
ExtendedBlock b = getLeadingStreamer().getBlock();
if (b == null) {
return null;
}
final ExtendedBlock block = new ExtendedBlock(b);
final boolean atBlockGroupBoundary =
getLeadingStreamer().getBytesCurBlock() == 0 &&
getLeadingStreamer().getBlock() != null &&
getLeadingStreamer().getBlock().getNumBytes() > 0;
for (int i = 1; i < numDataBlocks; i++) {
block.setNumBytes(block.getNumBytes() +
(atBlockGroupBoundary ? streamers.get(i).getBlock().getNumBytes() :
streamers.get(i).getBytesCurBlock()));
}
return block;
}
}

0 comments on commit 3682e01

Please sign in to comment.