Skip to content

Commit

Permalink
HDFS-8838. Erasure Coding: Tolerate datanode failures in DFSStripedOu…
Browse files Browse the repository at this point in the history
…tputStream when the data length is small. Contributed by Tsz Wo Nicholas Sze.
  • Loading branch information
Walter Su committed Aug 27, 2015
1 parent 6b6a63b commit 067ec8c
Show file tree
Hide file tree
Showing 12 changed files with 384 additions and 114 deletions.
Expand Up @@ -187,4 +187,10 @@ public boolean isShutdownInProgress() {
return shutdownInProgress.get();
}

/**
* clear all registered shutdownHooks.
*/
public void clearShutdownHooks() {
hooks.clear();
}
}
3 changes: 3 additions & 0 deletions hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
Expand Up @@ -400,3 +400,6 @@

HDFS-8220. Erasure Coding: StripedDataStreamer fails to handle the
blocklocations which doesn't satisfy BlockGroupSize. (Rakesh R via zhz)

HDFS-8838. Erasure Coding: Tolerate datanode failures in DFSStripedOutputStream
when the data length is small. (szetszwo via waltersu4549)
Expand Up @@ -406,13 +406,13 @@ protected synchronized void writeChunk(byte[] b, int offset, int len,
if (currentPacket == null) {
currentPacket = createPacket(packetSize, chunksPerPacket, getStreamer()
.getBytesCurBlock(), getStreamer().getAndIncCurrentSeqno(), false);
if (DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug("DFSClient writeChunk allocating new packet seqno=" +
if (LOG.isDebugEnabled()) {
LOG.debug("WriteChunk allocating new packet seqno=" +
currentPacket.getSeqno() +
", src=" + src +
", packetSize=" + packetSize +
", chunksPerPacket=" + chunksPerPacket +
", bytesCurBlock=" + getStreamer().getBytesCurBlock());
", bytesCurBlock=" + getStreamer().getBytesCurBlock() + ", " + this);
}
}

Expand Down
Expand Up @@ -170,15 +170,18 @@ ExtendedBlock getBlockGroup() {
}

final boolean atBlockGroupBoundary = s0.getBytesCurBlock() == 0 && b0.getNumBytes() > 0;

final ExtendedBlock block = new ExtendedBlock(b0);
long numBytes = b0.getNumBytes();
for (int i = 1; i < numDataBlocks; i++) {
long numBytes = atBlockGroupBoundary? b0.getNumBytes(): s0.getBytesCurBlock();
for (int i = 1; i < numAllBlocks; i++) {
final StripedDataStreamer si = getStripedDataStreamer(i);
final ExtendedBlock bi = si.getBlock();
if (bi != null && bi.getGenerationStamp() > block.getGenerationStamp()) {
block.setGenerationStamp(bi.getGenerationStamp());
}
numBytes += atBlockGroupBoundary? bi.getNumBytes(): si.getBytesCurBlock();
if (i < numDataBlocks) {
numBytes += atBlockGroupBoundary? bi.getNumBytes(): si.getBytesCurBlock();
}
}
block.setNumBytes(numBytes);
if (LOG.isDebugEnabled()) {
Expand Down Expand Up @@ -318,8 +321,7 @@ private synchronized StripedDataStreamer getCurrentStreamer() {
return (StripedDataStreamer)streamer;
}

private synchronized StripedDataStreamer setCurrentStreamer(int newIdx)
throws IOException {
private synchronized StripedDataStreamer setCurrentStreamer(int newIdx) {
// backup currentPacket for current streamer
int oldIdx = streamers.indexOf(streamer);
if (oldIdx >= 0) {
Expand Down Expand Up @@ -349,11 +351,11 @@ private static void encode(RawErasureEncoder encoder, int numData,
}


private void checkStreamers() throws IOException {
private void checkStreamers(boolean setExternalError) throws IOException {
int count = 0;
for(StripedDataStreamer s : streamers) {
if (!s.isFailed()) {
if (s.getBlock() != null) {
if (setExternalError && s.getBlock() != null) {
s.getErrorState().initExternalError();
}
count++;
Expand All @@ -369,11 +371,16 @@ private void checkStreamers() throws IOException {
}
}

private void handleStreamerFailure(String err,
Exception e) throws IOException {
private void handleStreamerFailure(String err, Exception e)
throws IOException {
handleStreamerFailure(err, e, true);
}

private void handleStreamerFailure(String err, Exception e,
boolean setExternalError) throws IOException {
LOG.warn("Failed: " + err + ", " + this, e);
getCurrentStreamer().setFailed(true);
checkStreamers();
checkStreamers(setExternalError);
currentPacket = null;
}

Expand Down Expand Up @@ -505,10 +512,10 @@ private long getCurrentSumBytes() {
return sum;
}

private void writeParityCellsForLastStripe() throws IOException {
private boolean generateParityCellsForLastStripe() {
final long currentBlockGroupBytes = getCurrentSumBytes();
if (currentBlockGroupBytes % stripeDataSize() == 0) {
return;
return false;
}

final int firstCellSize =
Expand All @@ -530,8 +537,7 @@ private void writeParityCellsForLastStripe() throws IOException {
}
buffers[i].flip();
}

writeParityCells();
return true;
}

void writeParityCells() throws IOException {
Expand Down Expand Up @@ -603,12 +609,14 @@ protected synchronized void closeImpl() throws IOException {
// flush from all upper layers
try {
flushBuffer();
// if the last stripe is incomplete, generate and write parity cells
writeParityCellsForLastStripe();
enqueueAllCurrentPackets();
} catch(Exception e) {
handleStreamerFailure("closeImpl", e);
handleStreamerFailure("flushBuffer " + getCurrentStreamer(), e);
}
// if the last stripe is incomplete, generate and write parity cells
if (generateParityCellsForLastStripe()) {
writeParityCells();
}
enqueueAllCurrentPackets();

for (int i = 0; i < numAllBlocks; i++) {
final StripedDataStreamer s = setCurrentStreamer(i);
Expand All @@ -620,7 +628,7 @@ protected synchronized void closeImpl() throws IOException {
// flush all data to Datanode
flushInternal();
} catch(Exception e) {
handleStreamerFailure("closeImpl", e);
handleStreamerFailure("flushInternal " + s, e, false);
}
}
}
Expand All @@ -643,9 +651,13 @@ protected synchronized void closeImpl() throws IOException {
private void enqueueAllCurrentPackets() throws IOException {
int idx = streamers.indexOf(getCurrentStreamer());
for(int i = 0; i < streamers.size(); i++) {
setCurrentStreamer(i);
if (currentPacket != null) {
enqueueCurrentPacket();
final StripedDataStreamer si = setCurrentStreamer(i);
if (!si.isFailed() && currentPacket != null) {
try {
enqueueCurrentPacket();
} catch (IOException e) {
handleStreamerFailure("enqueueAllCurrentPackets, i=" + i, e, false);
}
}
}
setCurrentStreamer(idx);
Expand Down
Expand Up @@ -173,7 +173,7 @@ private static void releaseBuffer(List<DFSPacket> packets, ByteArrayManager bam)
packets.clear();
}

static class LastExceptionInStreamer {
class LastExceptionInStreamer {
private IOException thrown;

synchronized void set(Throwable t) {
Expand All @@ -191,7 +191,8 @@ synchronized void check(boolean resetToNull) throws IOException {
if (thrown != null) {
if (LOG.isTraceEnabled()) {
// wrap and print the exception to know when the check is called
LOG.trace("Got Exception while checking", new Throwable(thrown));
LOG.trace("Got Exception while checking, " + DataStreamer.this,
new Throwable(thrown));
}
final IOException e = thrown;
if (resetToNull) {
Expand Down Expand Up @@ -584,16 +585,13 @@ public void run() {
}

// get new block from namenode.
if (LOG.isDebugEnabled()) {
LOG.debug("stage=" + stage + ", " + this);
}
if (stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) {
if(LOG.isDebugEnabled()) {
LOG.debug("Allocating new block " + this);
}
setPipeline(nextBlockOutputStream());
initDataStreaming();
} else if (stage == BlockConstructionStage.PIPELINE_SETUP_APPEND) {
if(LOG.isDebugEnabled()) {
LOG.debug("Append to block " + block);
}
setupPipelineForAppendOrRecovery();
if (streamerClosed) {
continue;
Expand Down Expand Up @@ -639,8 +637,7 @@ public void run() {
}

if (LOG.isDebugEnabled()) {
LOG.debug("DataStreamer block " + block +
" sending packet " + one);
LOG.debug(this + " sending " + one);
}

// write out data to remote datanode
Expand Down Expand Up @@ -1426,16 +1423,21 @@ static ExtendedBlock newBlock(ExtendedBlock b, final long newGS) {
/** update pipeline at the namenode */
ExtendedBlock updatePipeline(long newGS) throws IOException {
final ExtendedBlock newBlock = newBlock(block, newGS);
return callUpdatePipeline(block, newBlock);
return callUpdatePipeline(block, newBlock, nodes, storageIDs);
}

ExtendedBlock callUpdatePipeline(ExtendedBlock oldBlock, ExtendedBlock newBlock)
ExtendedBlock callUpdatePipeline(ExtendedBlock oldBlock, ExtendedBlock newBlock,
DatanodeInfo[] newNodes, String[] newStorageIDs)
throws IOException {
dfsClient.namenode.updatePipeline(dfsClient.clientName, oldBlock, newBlock,
nodes, storageIDs);
newNodes, newStorageIDs);
return newBlock;
}

int getNumBlockWriteRetry() {
return dfsClient.getConf().getNumBlockWriteRetry();
}

/**
* Open a DataStreamer to a DataNode so that it can be written to.
* This happens when a file is created and each time a new block is allocated.
Expand All @@ -1446,7 +1448,7 @@ private LocatedBlock nextBlockOutputStream() throws IOException {
LocatedBlock lb = null;
DatanodeInfo[] nodes = null;
StorageType[] storageTypes = null;
int count = dfsClient.getConf().getNumBlockWriteRetry();
int count = getNumBlockWriteRetry();
boolean success = false;
ExtendedBlock oldBlock = block;
do {
Expand Down Expand Up @@ -1502,7 +1504,7 @@ private boolean createBlockOutputStream(DatanodeInfo[] nodes,
String firstBadLink = "";
boolean checkRestart = false;
if (LOG.isDebugEnabled()) {
LOG.debug("pipeline = " + Arrays.asList(nodes));
LOG.debug("pipeline = " + Arrays.toString(nodes) + ", " + this);
}

// persist blocks on namenode on next flush
Expand Down Expand Up @@ -1574,7 +1576,7 @@ private boolean createBlockOutputStream(DatanodeInfo[] nodes,
errorState.reset();
} catch (IOException ie) {
if (!errorState.isRestartingNode()) {
LOG.info("Exception in createBlockOutputStream", ie);
LOG.info("Exception in createBlockOutputStream " + this, ie);
}
if (ie instanceof InvalidEncryptionKeyException && refetchEncryptionKey > 0) {
LOG.info("Will fetch a new encryption key and retry, "
Expand Down Expand Up @@ -1649,7 +1651,7 @@ private boolean[] getPinnings(DatanodeInfo[] nodes, boolean shouldLog) {
}
}

protected LocatedBlock locateFollowingBlock(DatanodeInfo[] excludedNodes)
LocatedBlock locateFollowingBlock(DatanodeInfo[] excludedNodes)
throws IOException {
final DfsClientConf conf = dfsClient.getConf();
int retries = conf.getNumBlockWriteLocateFollowingRetry();
Expand Down Expand Up @@ -1755,6 +1757,10 @@ DatanodeInfo[] getNodes() {
return nodes;
}

String[] getStorageIDs() {
return storageIDs;
}

/**
* return the token of the block
*
Expand Down Expand Up @@ -1933,7 +1939,6 @@ void closeSocket() throws IOException {

@Override
public String toString() {
return (block == null? null: block.getLocalBlock())
+ "@" + Arrays.toString(getNodes());
return block == null? "block==null": "" + block.getLocalBlock();
}
}

0 comments on commit 067ec8c

Please sign in to comment.