Skip to content

Commit

Permalink
HDFS-7936. Erasure coding: resolving conflicts in the branch when mer…
Browse files Browse the repository at this point in the history
…ging trunk changes (this commit mainly addresses HDFS-8081 and HDFS-8048. Contributed by Zhe Zhang.
  • Loading branch information
zhe-thoughts authored and Zhe Zhang committed May 26, 2015
1 parent f53e402 commit 35797b0
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 11 deletions.
Expand Up @@ -1109,7 +1109,7 @@ private void actualGetFromOneDataNode(final DNAddrPair datanode,
int offset, Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
throws IOException {
final int length = (int) (end - start + 1);
actualGetFromOneDataNode(datanode, block, start, end, buf,
actualGetFromOneDataNode(datanode, blockStartOffset, start, end, buf,
new int[]{offset}, new int[]{length}, corruptedBlockMap);
}

Expand All @@ -1128,7 +1128,7 @@ private void actualGetFromOneDataNode(final DNAddrPair datanode,
* block replica
*/
void actualGetFromOneDataNode(final DNAddrPair datanode,
LocatedBlock block, final long startInBlk, final long endInBlk,
long blockStartOffset, final long startInBlk, final long endInBlk,
byte[] buf, int[] offsets, int[] lengths,
Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
throws IOException {
Expand Down
Expand Up @@ -224,7 +224,7 @@ private LocatedBlock getBlockGroupAt(long offset) throws IOException {
* Real implementation of pread.
*/
@Override
protected void fetchBlockByteRange(LocatedBlock block, long start,
protected void fetchBlockByteRange(long blockStartOffset, long start,
long end, byte[] buf, int offset,
Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
throws IOException {
Expand All @@ -234,7 +234,7 @@ protected void fetchBlockByteRange(LocatedBlock block, long start,
int len = (int) (end - start + 1);

// Refresh the striped block group
block = getBlockGroupAt(block.getStartOffset());
LocatedBlock block = getBlockGroupAt(blockStartOffset);
assert block instanceof LocatedStripedBlock : "NameNode" +
" should return a LocatedStripedBlock for a striped file";
LocatedStripedBlock blockGroup = (LocatedStripedBlock) block;
Expand All @@ -254,9 +254,11 @@ protected void fetchBlockByteRange(LocatedBlock block, long start,
DatanodeInfo loc = blks[i].getLocations()[0];
StorageType type = blks[i].getStorageTypes()[0];
DNAddrPair dnAddr = new DNAddrPair(loc, NetUtils.createSocketAddr(
loc.getXferAddr(dfsClient.getConf().connectToDnViaHostname)), type);
Callable<Void> readCallable = getFromOneDataNode(dnAddr, blks[i],
rp.startOffsetInBlock, rp.startOffsetInBlock + rp.readLength - 1, buf,
loc.getXferAddr(dfsClient.getConf().isConnectToDnViaHostname())),
type);
Callable<Void> readCallable = getFromOneDataNode(dnAddr,
blks[i].getStartOffset(), rp.startOffsetInBlock,
rp.startOffsetInBlock + rp.readLength - 1, buf,
rp.getOffsets(), rp.getLengths(), corruptedBlockMap, i);
Future<Void> getFromDNRequest = stripedReadsService.submit(readCallable);
DFSClient.LOG.debug("Submitting striped read request for " + blks[i]);
Expand All @@ -272,7 +274,7 @@ protected void fetchBlockByteRange(LocatedBlock block, long start,
}

private Callable<Void> getFromOneDataNode(final DNAddrPair datanode,
final LocatedBlock block, final long start, final long end,
final long blockStartOffset, final long start, final long end,
final byte[] buf, final int[] offsets, final int[] lengths,
final Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap,
final int hedgedReadId) {
Expand All @@ -283,7 +285,7 @@ public Void call() throws Exception {
TraceScope scope =
Trace.startSpan("Parallel reading " + hedgedReadId, parentSpan);
try {
actualGetFromOneDataNode(datanode, block, start,
actualGetFromOneDataNode(datanode, blockStartOffset, start,
end, buf, offsets, lengths, corruptedBlockMap);
} finally {
scope.close();
Expand Down
Expand Up @@ -284,7 +284,8 @@ synchronized void abort() throws IOException {
}
for (StripedDataStreamer streamer : streamers) {
streamer.setLastException(new IOException("Lease timeout of "
+ (dfsClient.getHdfsTimeout()/1000) + " seconds expired."));
+ (dfsClient.getConf().getHdfsTimeout()/1000) +
" seconds expired."));
}
closeThreads(true);
dfsClient.endFileLease(fileId);
Expand Down
Expand Up @@ -5,6 +5,7 @@
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
import org.apache.hadoop.hdfs.net.Peer;
import org.apache.hadoop.hdfs.net.TcpPeerServer;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
Expand Down Expand Up @@ -241,7 +242,7 @@ private void testOneFile(String src, int writeBytes)
}

block.setNumBytes(lenOfBlock);
BlockReader blockReader = new BlockReaderFactory(new DFSClient.Conf(conf)).
BlockReader blockReader = new BlockReaderFactory(new DfsClientConf(conf)).
setFileName(src).
setBlock(block).
setBlockToken(lblock.getBlockToken()).
Expand Down

0 comments on commit 35797b0

Please sign in to comment.