Skip to content

Commit

Permalink
HDFS-8076. Code cleanup for DFSInputStream: use offset instead of Loc…
Browse files Browse the repository at this point in the history
…atedBlock when possible. Contributed by Zhe Zhang.
  • Loading branch information
umbrant committed Apr 8, 2015
1 parent 265ed1f commit a42bb1c
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 19 deletions.
3 changes: 3 additions & 0 deletions hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
Expand Up @@ -396,6 +396,9 @@ Release 2.8.0 - UNRELEASED

HDFS-8046. Allow better control of getContentSummary (kihwal)

HDFS-8076. Code cleanup for DFSInputStream: use offset instead of
LocatedBlock when possible. (Zhe Zhang via wang)

OPTIMIZATIONS

HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than
Expand Down
Expand Up @@ -1045,16 +1045,16 @@ private static String getBestNodeDNAddrPairErrorString(
return errMsgr.toString();
}

private void fetchBlockByteRange(LocatedBlock block, long start, long end,
private void fetchBlockByteRange(long blockStartOffset, long start, long end,
byte[] buf, int offset,
Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
throws IOException {
block = getBlockAt(block.getStartOffset());
LocatedBlock block = getBlockAt(blockStartOffset);
while (true) {
DNAddrPair addressPair = chooseDataNode(block, null);
try {
actualGetFromOneDataNode(addressPair, block, start, end, buf, offset,
corruptedBlockMap);
actualGetFromOneDataNode(addressPair, blockStartOffset, start, end,
buf, offset, corruptedBlockMap);
return;
} catch (IOException e) {
// Ignore. Already processed inside the function.
Expand All @@ -1064,7 +1064,7 @@ private void fetchBlockByteRange(LocatedBlock block, long start, long end,
}

private Callable<ByteBuffer> getFromOneDataNode(final DNAddrPair datanode,
final LocatedBlock block, final long start, final long end,
final long blockStartOffset, final long start, final long end,
final ByteBuffer bb,
final Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap,
final int hedgedReadId) {
Expand All @@ -1077,8 +1077,8 @@ public ByteBuffer call() throws Exception {
TraceScope scope =
Trace.startSpan("hedgedRead" + hedgedReadId, parentSpan);
try {
actualGetFromOneDataNode(datanode, block, start, end, buf, offset,
corruptedBlockMap);
actualGetFromOneDataNode(datanode, blockStartOffset, start, end, buf,
offset, corruptedBlockMap);
return bb;
} finally {
scope.close();
Expand All @@ -1088,7 +1088,7 @@ public ByteBuffer call() throws Exception {
}

private void actualGetFromOneDataNode(final DNAddrPair datanode,
LocatedBlock block, final long start, final long end, byte[] buf,
long blockStartOffset, final long start, final long end, byte[] buf,
int offset, Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
throws IOException {
DFSClientFaultInjector.get().startFetchFromDatanode();
Expand All @@ -1101,7 +1101,7 @@ private void actualGetFromOneDataNode(final DNAddrPair datanode,
// start of the loop.
CachingStrategy curCachingStrategy;
boolean allowShortCircuitLocalReads;
block = getBlockAt(block.getStartOffset());
LocatedBlock block = getBlockAt(blockStartOffset);
synchronized(infoLock) {
curCachingStrategy = cachingStrategy;
allowShortCircuitLocalReads = !shortCircuitForbidden();
Expand Down Expand Up @@ -1189,7 +1189,7 @@ private void actualGetFromOneDataNode(final DNAddrPair datanode,
* if the first read is taking longer than configured amount of
* time. We then wait on which ever read returns first.
*/
private void hedgedFetchBlockByteRange(LocatedBlock block, long start,
private void hedgedFetchBlockByteRange(long blockStartOffset, long start,
long end, byte[] buf, int offset,
Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
throws IOException {
Expand All @@ -1201,7 +1201,7 @@ private void hedgedFetchBlockByteRange(LocatedBlock block, long start,
ByteBuffer bb = null;
int len = (int) (end - start + 1);
int hedgedReadId = 0;
block = getBlockAt(block.getStartOffset());
LocatedBlock block = getBlockAt(blockStartOffset);
while (true) {
// see HDFS-6591, this metric is used to verify/catch unnecessary loops
hedgedReadOpsLoopNumForTesting++;
Expand All @@ -1213,8 +1213,8 @@ private void hedgedFetchBlockByteRange(LocatedBlock block, long start,
chosenNode = chooseDataNode(block, ignored);
bb = ByteBuffer.wrap(buf, offset, len);
Callable<ByteBuffer> getFromDataNodeCallable = getFromOneDataNode(
chosenNode, block, start, end, bb, corruptedBlockMap,
hedgedReadId++);
chosenNode, block.getStartOffset(), start, end, bb,
corruptedBlockMap, hedgedReadId++);
Future<ByteBuffer> firstRequest = hedgedService
.submit(getFromDataNodeCallable);
futures.add(firstRequest);
Expand Down Expand Up @@ -1251,8 +1251,8 @@ private void hedgedFetchBlockByteRange(LocatedBlock block, long start,
}
bb = ByteBuffer.allocate(len);
Callable<ByteBuffer> getFromDataNodeCallable = getFromOneDataNode(
chosenNode, block, start, end, bb, corruptedBlockMap,
hedgedReadId++);
chosenNode, block.getStartOffset(), start, end, bb,
corruptedBlockMap, hedgedReadId++);
Future<ByteBuffer> oneMoreRequest = hedgedService
.submit(getFromDataNodeCallable);
futures.add(oneMoreRequest);
Expand Down Expand Up @@ -1405,11 +1405,13 @@ private int pread(long position, byte[] buffer, int offset, int length)
long bytesToRead = Math.min(remaining, blk.getBlockSize() - targetStart);
try {
if (dfsClient.isHedgedReadsEnabled()) {
hedgedFetchBlockByteRange(blk, targetStart, targetStart + bytesToRead
- 1, buffer, offset, corruptedBlockMap);
hedgedFetchBlockByteRange(blk.getStartOffset(), targetStart,
targetStart + bytesToRead - 1, buffer, offset,
corruptedBlockMap);
} else {
fetchBlockByteRange(blk, targetStart, targetStart + bytesToRead - 1,
buffer, offset, corruptedBlockMap);
fetchBlockByteRange(blk.getStartOffset(), targetStart,
targetStart + bytesToRead - 1, buffer, offset,
corruptedBlockMap);
}
} finally {
// Check and report if any block replicas are corrupted.
Expand Down

0 comments on commit a42bb1c

Please sign in to comment.