Skip to content

Commit

Permalink
HDFS-9833. Erasure coding: recomputing block checksum on the fly by r…
Browse files Browse the repository at this point in the history
…econstructing the missed/corrupt block data. Contributed by Rakesh R.
  • Loading branch information
Kai Zheng committed Jun 2, 2016
1 parent 8ceb06e commit d749cf6
Show file tree
Hide file tree
Showing 16 changed files with 675 additions and 200 deletions.
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -460,7 +460,8 @@ private boolean checksumBlockGroup(
setRemaining(getRemaining() - block.getNumBytes()); setRemaining(getRemaining() - block.getNumBytes());


StripedBlockInfo stripedBlockInfo = new StripedBlockInfo(block, StripedBlockInfo stripedBlockInfo = new StripedBlockInfo(block,
blockGroup.getLocations(), blockGroup.getBlockTokens(), ecPolicy); blockGroup.getLocations(), blockGroup.getBlockTokens(),
blockGroup.getBlockIndices(), ecPolicy);
DatanodeInfo[] datanodes = blockGroup.getLocations(); DatanodeInfo[] datanodes = blockGroup.getLocations();


//try each datanode in the block group. //try each datanode in the block group.
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -32,14 +32,16 @@ public class StripedBlockInfo {
private final ExtendedBlock block; private final ExtendedBlock block;
private final DatanodeInfo[] datanodes; private final DatanodeInfo[] datanodes;
private final Token<BlockTokenIdentifier>[] blockTokens; private final Token<BlockTokenIdentifier>[] blockTokens;
private final byte[] blockIndices;
private final ErasureCodingPolicy ecPolicy; private final ErasureCodingPolicy ecPolicy;


public StripedBlockInfo(ExtendedBlock block, DatanodeInfo[] datanodes, public StripedBlockInfo(ExtendedBlock block, DatanodeInfo[] datanodes,
Token<BlockTokenIdentifier>[] blockTokens, Token<BlockTokenIdentifier>[] blockTokens, byte[] blockIndices,
ErasureCodingPolicy ecPolicy) { ErasureCodingPolicy ecPolicy) {
this.block = block; this.block = block;
this.datanodes = datanodes; this.datanodes = datanodes;
this.blockTokens = blockTokens; this.blockTokens = blockTokens;
this.blockIndices = blockIndices;
this.ecPolicy = ecPolicy; this.ecPolicy = ecPolicy;
} }


Expand All @@ -55,6 +57,10 @@ public Token<BlockTokenIdentifier>[] getBlockTokens() {
return blockTokens; return blockTokens;
} }


public byte[] getBlockIndices() {
return blockIndices;
}

public ErasureCodingPolicy getErasureCodingPolicy() { public ErasureCodingPolicy getErasureCodingPolicy() {
return ecPolicy; return ecPolicy;
} }
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -274,6 +274,8 @@ public void blockGroupChecksum(StripedBlockInfo stripedBlockInfo,
stripedBlockInfo.getDatanodes())) stripedBlockInfo.getDatanodes()))
.addAllBlockTokens(PBHelperClient.convert( .addAllBlockTokens(PBHelperClient.convert(
stripedBlockInfo.getBlockTokens())) stripedBlockInfo.getBlockTokens()))
.addAllBlockIndices(PBHelperClient
.convertBlockIndices(stripedBlockInfo.getBlockIndices()))
.setEcPolicy(PBHelperClient.convertErasureCodingPolicy( .setEcPolicy(PBHelperClient.convertErasureCodingPolicy(
stripedBlockInfo.getErasureCodingPolicy())) stripedBlockInfo.getErasureCodingPolicy()))
.build(); .build();
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -853,6 +853,22 @@ public static List<TokenProto> convert(
return results; return results;
} }


public static List<Integer> convertBlockIndices(byte[] blockIndices) {
List<Integer> results = new ArrayList<>(blockIndices.length);
for (byte bt : blockIndices) {
results.add(Integer.valueOf(bt));
}
return results;
}

public static byte[] convertBlockIndices(List<Integer> blockIndices) {
byte[] blkIndices = new byte[blockIndices.size()];
for (int i = 0; i < blockIndices.size(); i++) {
blkIndices[i] = (byte) blockIndices.get(i).intValue();
}
return blkIndices;
}

public static BlockStoragePolicy convert(BlockStoragePolicyProto proto) { public static BlockStoragePolicy convert(BlockStoragePolicyProto proto) {
List<StorageTypeProto> cList = proto.getCreationPolicy() List<StorageTypeProto> cList = proto.getCreationPolicy()
.getStorageTypesList(); .getStorageTypesList();
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ message OpBlockGroupChecksumProto {
// each internal block has a block token // each internal block has a block token
repeated hadoop.common.TokenProto blockTokens = 3; repeated hadoop.common.TokenProto blockTokens = 3;
required ErasureCodingPolicyProto ecPolicy = 4; required ErasureCodingPolicyProto ecPolicy = 4;
repeated uint32 blockIndices = 5;
} }


/** /**
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -306,6 +306,7 @@ private void opStripedBlockChecksum(DataInputStream dis) throws IOException {
PBHelperClient.convert(proto.getHeader().getBlock()), PBHelperClient.convert(proto.getHeader().getBlock()),
PBHelperClient.convert(proto.getDatanodes()), PBHelperClient.convert(proto.getDatanodes()),
PBHelperClient.convertTokens(proto.getBlockTokensList()), PBHelperClient.convertTokens(proto.getBlockTokensList()),
PBHelperClient.convertBlockIndices(proto.getBlockIndicesList()),
PBHelperClient.convertErasureCodingPolicy(proto.getEcPolicy()) PBHelperClient.convertErasureCodingPolicy(proto.getEcPolicy())
); );


Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package org.apache.hadoop.hdfs.server.datanode; package org.apache.hadoop.hdfs.server.datanode;


import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.DFSUtilClient; import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
Expand All @@ -30,6 +32,8 @@
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos;
import org.apache.hadoop.hdfs.protocolPB.PBHelperClient; import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.datanode.erasurecode.StripedBlockChecksumReconstructor;
import org.apache.hadoop.hdfs.server.datanode.erasurecode.StripedReconstructionInfo;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream; import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
import org.apache.hadoop.hdfs.util.StripedBlockUtil; import org.apache.hadoop.hdfs.util.StripedBlockUtil;
import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.DataOutputBuffer;
Expand All @@ -46,11 +50,14 @@
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.security.MessageDigest; import java.security.MessageDigest;
import java.util.HashMap;
import java.util.Map;


/** /**
* Utilities for Block checksum computing, for both replicated and striped * Utilities for Block checksum computing, for both replicated and striped
* blocks. * blocks.
*/ */
@InterfaceAudience.Private
final class BlockChecksumHelper { final class BlockChecksumHelper {


static final Logger LOG = LoggerFactory.getLogger(BlockChecksumHelper.class); static final Logger LOG = LoggerFactory.getLogger(BlockChecksumHelper.class);
Expand Down Expand Up @@ -327,6 +334,7 @@ static class BlockGroupNonStripedChecksumComputer
private final ErasureCodingPolicy ecPolicy; private final ErasureCodingPolicy ecPolicy;
private final DatanodeInfo[] datanodes; private final DatanodeInfo[] datanodes;
private final Token<BlockTokenIdentifier>[] blockTokens; private final Token<BlockTokenIdentifier>[] blockTokens;
private final byte[] blockIndices;


private final DataOutputBuffer md5writer = new DataOutputBuffer(); private final DataOutputBuffer md5writer = new DataOutputBuffer();


Expand All @@ -338,17 +346,61 @@ static class BlockGroupNonStripedChecksumComputer
this.ecPolicy = stripedBlockInfo.getErasureCodingPolicy(); this.ecPolicy = stripedBlockInfo.getErasureCodingPolicy();
this.datanodes = stripedBlockInfo.getDatanodes(); this.datanodes = stripedBlockInfo.getDatanodes();
this.blockTokens = stripedBlockInfo.getBlockTokens(); this.blockTokens = stripedBlockInfo.getBlockTokens();
this.blockIndices = stripedBlockInfo.getBlockIndices();
}

private static class LiveBlockInfo {
private final DatanodeInfo dn;
private final Token<BlockTokenIdentifier> token;

LiveBlockInfo(DatanodeInfo dn, Token<BlockTokenIdentifier> token) {
this.dn = dn;
this.token = token;
}

DatanodeInfo getDn() {
return dn;
}

Token<BlockTokenIdentifier> getToken() {
return token;
}
} }


@Override @Override
void compute() throws IOException { void compute() throws IOException {
for (int idx = 0; idx < ecPolicy.getNumDataUnits(); idx++) { assert datanodes.length == blockIndices.length;
ExtendedBlock block =
StripedBlockUtil.constructInternalBlock(blockGroup, Map<Byte, LiveBlockInfo> liveDns = new HashMap<>(datanodes.length);
ecPolicy.getCellSize(), ecPolicy.getNumDataUnits(), idx); int blkIndxLen = blockIndices.length;
DatanodeInfo targetDatanode = datanodes[idx]; int numDataUnits = ecPolicy.getNumDataUnits();
Token<BlockTokenIdentifier> blockToken = blockTokens[idx]; // Prepare live datanode list. Missing data blocks will be reconstructed
checksumBlock(block, idx, blockToken, targetDatanode); // and recalculate checksum.
for (int idx = 0; idx < blkIndxLen; idx++) {
liveDns.put(blockIndices[idx],
new LiveBlockInfo(datanodes[idx], blockTokens[idx]));
}
for (int idx = 0; idx < numDataUnits && idx < blkIndxLen; idx++) {
try {
LiveBlockInfo liveBlkInfo = liveDns.get((byte) idx);
if (liveBlkInfo == null) {
// reconstruct block and calculate checksum for missing node
recalculateChecksum(idx);
} else {
try {
ExtendedBlock block = StripedBlockUtil.constructInternalBlock(
blockGroup, ecPolicy.getCellSize(), numDataUnits, idx);
checksumBlock(block, idx, liveBlkInfo.getToken(),
liveBlkInfo.getDn());
} catch (IOException ioe) {
LOG.warn("Exception while reading checksum", ioe);
// reconstruct block and calculate checksum for the failed node
recalculateChecksum(idx);
}
}
} catch (IOException e) {
LOG.warn("Failed to get the checksum", e);
}
} }


MD5Hash md5out = MD5Hash.digest(md5writer.getData()); MD5Hash md5out = MD5Hash.digest(md5writer.getData());
Expand Down Expand Up @@ -379,52 +431,90 @@ private void checksumBlock(ExtendedBlock block, int blockIdx,
DataTransferProtos.OpBlockChecksumResponseProto checksumData = DataTransferProtos.OpBlockChecksumResponseProto checksumData =
reply.getChecksumResponse(); reply.getChecksumResponse();


//read byte-per-checksum
final int bpc = checksumData.getBytesPerCrc();
if (blockIdx == 0) { //first block
setBytesPerCRC(bpc);
} else if (bpc != getBytesPerCRC()) {
throw new IOException("Byte-per-checksum not matched: bpc=" + bpc
+ " but bytesPerCRC=" + getBytesPerCRC());
}

//read crc-per-block
final long cpb = checksumData.getCrcPerBlock();
if (blockIdx == 0) {
setCrcPerBlock(cpb);
}

//read md5
final MD5Hash md5 = new MD5Hash(
checksumData.getMd5().toByteArray());
md5.write(md5writer);

// read crc-type // read crc-type
final DataChecksum.Type ct; final DataChecksum.Type ct;
if (checksumData.hasCrcType()) { if (checksumData.hasCrcType()) {
ct = PBHelperClient.convert(checksumData.getCrcType()); ct = PBHelperClient.convert(checksumData.getCrcType());
} else { } else {
LOG.debug("Retrieving checksum from an earlier-version DataNode: " + LOG.debug("Retrieving checksum from an earlier-version DataNode: "
"inferring checksum by reading first byte"); + "inferring checksum by reading first byte");
ct = DataChecksum.Type.DEFAULT; ct = DataChecksum.Type.DEFAULT;
} }


if (blockIdx == 0) { // first block setOrVerifyChecksumProperties(blockIdx, checksumData.getBytesPerCrc(),
setCrcType(ct); checksumData.getCrcPerBlock(), ct);
} else if (getCrcType() != DataChecksum.Type.MIXED && //read md5
getCrcType() != ct) { final MD5Hash md5 = new MD5Hash(checksumData.getMd5().toByteArray());
// if crc types are mixed in a file md5.write(md5writer);
setCrcType(DataChecksum.Type.MIXED);
}

if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
if (blockIdx == 0) {
LOG.debug("set bytesPerCRC=" + getBytesPerCRC()
+ ", crcPerBlock=" + getCrcPerBlock());
}
LOG.debug("got reply from " + targetDatanode + ": md5=" + md5); LOG.debug("got reply from " + targetDatanode + ": md5=" + md5);
} }
} }
} }

/**
* Reconstruct this data block and recalculate checksum.
*
* @param errBlkIndex
* error index to be reconstrcuted and recalculate checksum.
* @throws IOException
*/
private void recalculateChecksum(int errBlkIndex) throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("Recalculate checksum for the missing/failed block index "
+ errBlkIndex);
}
byte[] errIndices = new byte[1];
errIndices[0] = (byte) errBlkIndex;
StripedReconstructionInfo stripedReconInfo =
new StripedReconstructionInfo(
blockGroup, ecPolicy, blockIndices, datanodes, errIndices);
final StripedBlockChecksumReconstructor checksumRecon =
new StripedBlockChecksumReconstructor(
getDatanode().getErasureCodingWorker(), stripedReconInfo,
md5writer);
checksumRecon.reconstruct();

DataChecksum checksum = checksumRecon.getChecksum();
long crcPerBlock = checksum.getChecksumSize() <= 0 ? 0
: checksumRecon.getChecksumDataLen() / checksum.getChecksumSize();
setOrVerifyChecksumProperties(errBlkIndex, checksum.getBytesPerChecksum(),
crcPerBlock, checksum.getChecksumType());
if (LOG.isDebugEnabled()) {
LOG.debug("Recalculated checksum for the block index " + errBlkIndex
+ ": md5=" + checksumRecon.getMD5());
}
}

private void setOrVerifyChecksumProperties(int blockIdx, int bpc,
final long cpb, DataChecksum.Type ct) throws IOException {
//read byte-per-checksum
if (blockIdx == 0) { //first block
setBytesPerCRC(bpc);
} else if (bpc != getBytesPerCRC()) {
throw new IOException("Byte-per-checksum not matched: bpc=" + bpc
+ " but bytesPerCRC=" + getBytesPerCRC());
}

//read crc-per-block
if (blockIdx == 0) {
setCrcPerBlock(cpb);
}

if (blockIdx == 0) { // first block
setCrcType(ct);
} else if (getCrcType() != DataChecksum.Type.MIXED &&
getCrcType() != ct) {
// if crc types are mixed in a file
setCrcType(DataChecksum.Type.MIXED);
}

if (LOG.isDebugEnabled()) {
if (blockIdx == 0) {
LOG.debug("set bytesPerCRC=" + getBytesPerCRC()
+ ", crcPerBlock=" + getCrcPerBlock());
}
}
}
} }
} }
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -116,19 +116,24 @@ public Thread newThread(Runnable r) {
*/ */
public void processErasureCodingTasks( public void processErasureCodingTasks(
Collection<BlockECReconstructionInfo> ecTasks) { Collection<BlockECReconstructionInfo> ecTasks) {
for (BlockECReconstructionInfo reconstructionInfo : ecTasks) { for (BlockECReconstructionInfo reconInfo : ecTasks) {
try { try {
final StripedReconstructor task = StripedReconstructionInfo stripedReconInfo =
new StripedReconstructor(this, reconstructionInfo); new StripedReconstructionInfo(
reconInfo.getExtendedBlock(), reconInfo.getErasureCodingPolicy(),
reconInfo.getLiveBlockIndices(), reconInfo.getSourceDnInfos(),
reconInfo.getTargetDnInfos(), reconInfo.getTargetStorageTypes());
final StripedBlockReconstructor task =
new StripedBlockReconstructor(this, stripedReconInfo);
if (task.hasValidTargets()) { if (task.hasValidTargets()) {
stripedReconstructionPool.submit(task); stripedReconstructionPool.submit(task);
} else { } else {
LOG.warn("No missing internal block. Skip reconstruction for task:{}", LOG.warn("No missing internal block. Skip reconstruction for task:{}",
reconstructionInfo); reconInfo);
} }
} catch (Throwable e) { } catch (Throwable e) {
LOG.warn("Failed to reconstruct striped block {}", LOG.warn("Failed to reconstruct striped block {}",
reconstructionInfo.getExtendedBlock().getLocalBlock(), e); reconInfo.getExtendedBlock().getLocalBlock(), e);
} }
} }
} }
Expand Down
Loading

0 comments on commit d749cf6

Please sign in to comment.