Skip to content

Commit

Permalink
HDFS-8468. 2 RPC calls for every file read in DFSClient#open(..) resu…
Browse files Browse the repository at this point in the history
…lting in double Audit log entries (Contributed by Vinayakumar B)
  • Loading branch information
vinayakumarb committed Jun 30, 2015
1 parent 093907d commit 0b7af27
Show file tree
Hide file tree
Showing 14 changed files with 93 additions and 38 deletions.
Expand Up @@ -24,6 +24,7 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.FileEncryptionInfo;
import org.apache.hadoop.io.erasurecode.ECSchema;

/**
* Collection of blocks with their locations and the file length.
Expand All @@ -37,6 +38,8 @@ public class LocatedBlocks {
private final LocatedBlock lastLocatedBlock;
private final boolean isLastBlockComplete;
private final FileEncryptionInfo fileEncryptionInfo;
private final ECSchema ecSchema;
private final int stripeCellSize;

public LocatedBlocks() {
fileLength = 0;
Expand All @@ -45,17 +48,22 @@ public LocatedBlocks() {
lastLocatedBlock = null;
isLastBlockComplete = false;
fileEncryptionInfo = null;
ecSchema = null;
stripeCellSize = 0;
}

public LocatedBlocks(long flength, boolean isUnderConstuction,
List<LocatedBlock> blks, LocatedBlock lastBlock,
boolean isLastBlockCompleted, FileEncryptionInfo feInfo) {
List<LocatedBlock> blks, LocatedBlock lastBlock,
boolean isLastBlockCompleted, FileEncryptionInfo feInfo,
ECSchema ecSchema, int stripeCellSize) {
fileLength = flength;
blocks = blks;
underConstruction = isUnderConstuction;
this.lastLocatedBlock = lastBlock;
this.isLastBlockComplete = isLastBlockCompleted;
this.fileEncryptionInfo = feInfo;
this.ecSchema = ecSchema;
this.stripeCellSize = stripeCellSize;
}

/**
Expand Down Expand Up @@ -111,6 +119,20 @@ public FileEncryptionInfo getFileEncryptionInfo() {
return fileEncryptionInfo;
}

/**
* @return The ECSchema for ErasureCoded file, null otherwise.
*/
public ECSchema getECSchema() {
return ecSchema;
}

/**
* @return Stripe Cell size for ErasureCoded file, 0 otherwise.
*/
public int getStripeCellSize() {
return stripeCellSize;
}

/**
* Find block containing specified offset.
*
Expand Down
Expand Up @@ -479,7 +479,7 @@ static LocatedBlocks toLocatedBlocks(
(Map<?, ?>) m.get("lastLocatedBlock"));
final boolean isLastBlockComplete = (Boolean)m.get("isLastBlockComplete");
return new LocatedBlocks(fileLength, isUnderConstruction, locatedBlocks,
lastLocatedBlock, isLastBlockComplete, null);
lastLocatedBlock, isLastBlockComplete, null, null, 0);
}

}
3 changes: 3 additions & 0 deletions hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
Expand Up @@ -323,3 +323,6 @@

HDFS-8253. DFSStripedOutputStream.closeThreads releases cellBuffers
multiple times. (Kai Sasaki via szetszwo)

HDFS-8468. 2 RPC calls for every file read in DFSClient#open(..) resulting in
double Audit log entries (vinayakumarb)
Expand Up @@ -1192,15 +1192,17 @@ public DFSInputStream open(String src, int buffersize, boolean verifyChecksum)
// Get block info from namenode
TraceScope scope = getPathTraceScope("newDFSInputStream", src);
try {
HdfsFileStatus fileInfo = getFileInfo(src);
if (fileInfo != null) {
ECSchema schema = fileInfo.getECSchema();
LocatedBlocks locatedBlocks = getLocatedBlocks(src, 0);
if (locatedBlocks != null) {
ECSchema schema = locatedBlocks.getECSchema();
if (schema != null) {
return new DFSStripedInputStream(this, src, verifyChecksum, schema,
fileInfo.getStripeCellSize());
locatedBlocks.getStripeCellSize(), locatedBlocks);
}
return new DFSInputStream(this, src, verifyChecksum, locatedBlocks);
} else {
throw new IOException("Cannot open filename " + src);
}
return new DFSInputStream(this, src, verifyChecksum);
} finally {
scope.close();
}
Expand Down
Expand Up @@ -253,24 +253,28 @@ void addToDeadNodes(DatanodeInfo dnInfo) {
deadNodes.put(dnInfo, dnInfo);
}

DFSInputStream(DFSClient dfsClient, String src, boolean verifyChecksum
) throws IOException, UnresolvedLinkException {
DFSInputStream(DFSClient dfsClient, String src, boolean verifyChecksum,
LocatedBlocks locatedBlocks) throws IOException, UnresolvedLinkException {
this.dfsClient = dfsClient;
this.verifyChecksum = verifyChecksum;
this.src = src;
synchronized (infoLock) {
this.cachingStrategy = dfsClient.getDefaultReadCachingStrategy();
}
openInfo();
this.locatedBlocks = locatedBlocks;
openInfo(false);
}

/**
* Grab the open-file info from namenode
* @param refreshLocatedBlocks whether to re-fetch locatedblocks
*/
void openInfo() throws IOException, UnresolvedLinkException {
void openInfo(boolean refreshLocatedBlocks) throws IOException,
UnresolvedLinkException {
final DfsClientConf conf = dfsClient.getConf();
synchronized(infoLock) {
lastBlockBeingWrittenLength = fetchLocatedBlocksAndGetLastBlockLength();
lastBlockBeingWrittenLength =
fetchLocatedBlocksAndGetLastBlockLength(refreshLocatedBlocks);
int retriesForLastBlockLength = conf.getRetryTimesForGetLastBlockLength();
while (retriesForLastBlockLength > 0) {
// Getting last block length as -1 is a special case. When cluster
Expand All @@ -282,7 +286,8 @@ void openInfo() throws IOException, UnresolvedLinkException {
+ "Datanodes might not have reported blocks completely."
+ " Will retry for " + retriesForLastBlockLength + " times");
waitFor(conf.getRetryIntervalForGetLastBlockLength());
lastBlockBeingWrittenLength = fetchLocatedBlocksAndGetLastBlockLength();
lastBlockBeingWrittenLength =
fetchLocatedBlocksAndGetLastBlockLength(true);
} else {
break;
}
Expand All @@ -303,8 +308,12 @@ private void waitFor(int waitTime) throws IOException {
}
}

private long fetchLocatedBlocksAndGetLastBlockLength() throws IOException {
final LocatedBlocks newInfo = dfsClient.getLocatedBlocks(src, 0);
private long fetchLocatedBlocksAndGetLastBlockLength(boolean refresh)
throws IOException {
LocatedBlocks newInfo = locatedBlocks;
if (locatedBlocks == null || refresh) {
newInfo = dfsClient.getLocatedBlocks(src, 0);
}
if (DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug("newInfo = " + newInfo);
}
Expand Down Expand Up @@ -1015,7 +1024,7 @@ private DNAddrPair chooseDataNode(LocatedBlock block,
} catch (InterruptedException iex) {
}
deadNodes.clear(); //2nd option is to remove only nodes[blockId]
openInfo();
openInfo(true);
block = refreshLocatedBlock(block);
failures++;
}
Expand Down
Expand Up @@ -24,6 +24,7 @@
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockIdManager;
Expand Down Expand Up @@ -139,9 +140,10 @@ boolean include(long pos) {
private final CompletionService<Void> readingService;
private ReaderRetryPolicy retry;

DFSStripedInputStream(DFSClient dfsClient, String src, boolean verifyChecksum,
ECSchema schema, int cellSize) throws IOException {
super(dfsClient, src, verifyChecksum);
DFSStripedInputStream(DFSClient dfsClient, String src,
boolean verifyChecksum, ECSchema schema, int cellSize,
LocatedBlocks locatedBlocks) throws IOException {
super(dfsClient, src, verifyChecksum, locatedBlocks);

assert schema != null;
this.schema = schema;
Expand Down
Expand Up @@ -1338,8 +1338,9 @@ public static LocatedBlocks convert(LocatedBlocksProto lb) {
lb.hasLastBlock() ?
PBHelper.convertLocatedBlockProto(lb.getLastBlock()) : null,
lb.getIsLastBlockComplete(),
lb.hasFileEncryptionInfo() ? convert(lb.getFileEncryptionInfo()) :
null);
lb.hasFileEncryptionInfo() ? convert(lb.getFileEncryptionInfo()) : null,
lb.hasECSchema() ? convertECSchema(lb.getECSchema()) : null,
lb.hasStripeCellSize() ? lb.getStripeCellSize() : 0);
}

public static LocatedBlocksProto convert(LocatedBlocks lb) {
Expand All @@ -1355,6 +1356,12 @@ public static LocatedBlocksProto convert(LocatedBlocks lb) {
if (lb.getFileEncryptionInfo() != null) {
builder.setFileEncryptionInfo(convert(lb.getFileEncryptionInfo()));
}
if (lb.getECSchema() != null) {
builder.setECSchema(convertECSchema(lb.getECSchema()));
}
if (lb.getStripeCellSize() != 0) {
builder.setStripeCellSize(lb.getStripeCellSize());
}
return builder.setFileLength(lb.getFileLength())
.setUnderConstruction(lb.isUnderConstruction())
.addAllBlocks(PBHelper.convertLocatedBlocks2(lb.getLocatedBlocks()))
Expand Down
Expand Up @@ -938,14 +938,18 @@ public LocatedBlocks createLocatedBlocks(final BlockInfo[] blocks,
final long fileSizeExcludeBlocksUnderConstruction,
final boolean isFileUnderConstruction, final long offset,
final long length, final boolean needBlockToken,
final boolean inSnapshot, FileEncryptionInfo feInfo)
final boolean inSnapshot, FileEncryptionInfo feInfo,
ErasureCodingZone ecZone)
throws IOException {
assert namesystem.hasReadLock();
final ECSchema schema = ecZone != null ? ecZone.getSchema() : null;
final int cellSize = ecZone != null ? ecZone.getCellSize() : 0;
if (blocks == null) {
return null;
} else if (blocks.length == 0) {
return new LocatedBlocks(0, isFileUnderConstruction,
Collections.<LocatedBlock>emptyList(), null, false, feInfo);
Collections.<LocatedBlock> emptyList(), null, false, feInfo, schema,
cellSize);
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("blocks = " + java.util.Arrays.asList(blocks));
Expand All @@ -968,9 +972,9 @@ public LocatedBlocks createLocatedBlocks(final BlockInfo[] blocks,
fileSizeExcludeBlocksUnderConstruction, mode);
isComplete = true;
}
return new LocatedBlocks(
fileSizeExcludeBlocksUnderConstruction, isFileUnderConstruction,
locatedblocks, lastlb, isComplete, feInfo);
return new LocatedBlocks(fileSizeExcludeBlocksUnderConstruction,
isFileUnderConstruction, locatedblocks, lastlb, isComplete, feInfo,
schema, cellSize);
}
}

Expand Down
Expand Up @@ -445,6 +445,8 @@ private static HdfsLocatedFileStatus createLocatedFileStatus(
final boolean isEncrypted;
final FileEncryptionInfo feInfo = isRawPath ? null :
fsd.getFileEncryptionInfo(node, snapshot, iip);
final ErasureCodingZone ecZone = FSDirErasureCodingOp.getErasureCodingZone(
fsd.getFSNamesystem(), iip);
if (node.isFile()) {
final INodeFile fileNode = node.asFile();
size = fileNode.computeFileSize(snapshot);
Expand All @@ -458,7 +460,7 @@ private static HdfsLocatedFileStatus createLocatedFileStatus(

loc = fsd.getFSNamesystem().getBlockManager().createLocatedBlocks(
fileNode.getBlocks(snapshot), fileSize, isUc, 0L, size, false,
inSnapshot, feInfo);
inSnapshot, feInfo, ecZone);
if (loc == null) {
loc = new LocatedBlocks();
}
Expand All @@ -469,8 +471,6 @@ private static HdfsLocatedFileStatus createLocatedFileStatus(
}
int childrenNum = node.isDirectory() ?
node.asDirectory().getChildrenNum(snapshot) : 0;
final ErasureCodingZone ecZone = FSDirErasureCodingOp.getErasureCodingZone(
fsd.getFSNamesystem(), iip);
final ECSchema schema = ecZone != null ? ecZone.getSchema() : null;
final int cellSize = ecZone != null ? ecZone.getCellSize() : 0;

Expand Down
Expand Up @@ -1873,10 +1873,12 @@ private GetBlockLocationsResult getBlockLocationsInt(
final FileEncryptionInfo feInfo =
FSDirectory.isReservedRawName(srcArg) ? null
: dir.getFileEncryptionInfo(inode, iip.getPathSnapshotId(), iip);
final ErasureCodingZone ecZone = FSDirErasureCodingOp.getErasureCodingZone(
this, iip);

final LocatedBlocks blocks = blockManager.createLocatedBlocks(
inode.getBlocks(iip.getPathSnapshotId()), fileSize, isUc, offset,
length, needBlockToken, iip.isSnapshot(), feInfo);
length, needBlockToken, iip.isSnapshot(), feInfo, ecZone);

// Set caching information for the located blocks.
for (LocatedBlock lb : blocks.getLocatedBlocks()) {
Expand Down
4 changes: 4 additions & 0 deletions hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto
Expand Up @@ -301,6 +301,10 @@ message LocatedBlocksProto {
optional LocatedBlockProto lastBlock = 4;
required bool isLastBlockComplete = 5;
optional FileEncryptionInfoProto fileEncryptionInfo = 6;

// Optional field for erasure coding
optional ECSchemaProto eCSchema = 7;
optional uint32 stripeCellSize = 8;
}

/**
Expand Down
Expand Up @@ -343,7 +343,7 @@ public void testFailuresArePerOperation() throws Exception
// we're starting a new operation on the user level.
doAnswer(new FailNTimesAnswer(preSpyNN, maxBlockAcquires))
.when(spyNN).getBlockLocations(anyString(), anyLong(), anyLong());
is.openInfo();
is.openInfo(true);
// Seek to beginning forces a reopen of the BlockReader - otherwise it'll
// just keep reading on the existing stream and the fact that we've poisoned
// the block info won't do anything.
Expand Down Expand Up @@ -496,7 +496,7 @@ private LocatedBlocks makeBadBlockList(LocatedBlocks goodBlockList) {
badBlocks.add(badLocatedBlock);
return new LocatedBlocks(goodBlockList.getFileLength(), false,
badBlocks, null, true,
null);
null, null, 0);
}
}

Expand Down
Expand Up @@ -101,7 +101,7 @@ public void testRefreshBlock() throws Exception {
LocatedBlocks lbs = fs.getClient().namenode.getBlockLocations(
filePath.toString(), 0, BLOCK_GROUP_SIZE * numBlocks);
final DFSStripedInputStream in = new DFSStripedInputStream(fs.getClient(),
filePath.toString(), false, schema, CELLSIZE);
filePath.toString(), false, schema, CELLSIZE, null);

List<LocatedBlock> lbList = lbs.getLocatedBlocks();
for (LocatedBlock aLbList : lbList) {
Expand Down Expand Up @@ -153,7 +153,7 @@ public void testPread() throws Exception {
}
}
DFSStripedInputStream in = new DFSStripedInputStream(fs.getClient(),
filePath.toString(), false, schema, CELLSIZE);
filePath.toString(), false, schema, CELLSIZE, null);

int[] startOffsets = {0, 1, CELLSIZE - 102, CELLSIZE, CELLSIZE + 102,
CELLSIZE*DATA_BLK_NUM, CELLSIZE*DATA_BLK_NUM + 102,
Expand Down Expand Up @@ -195,7 +195,7 @@ public void testPreadWithDNFailure() throws Exception {
}
DFSStripedInputStream in =
new DFSStripedInputStream(fs.getClient(), filePath.toString(), false,
ErasureCodingSchemaManager.getSystemDefaultSchema(), CELLSIZE);
ErasureCodingSchemaManager.getSystemDefaultSchema(), CELLSIZE, null);
int readSize = BLOCK_GROUP_SIZE;
byte[] readBuffer = new byte[readSize];
byte[] expected = new byte[readSize];
Expand Down Expand Up @@ -293,7 +293,7 @@ private void testStatefulRead(boolean useByteBuffer, boolean cellMisalignPacket)

DFSStripedInputStream in =
new DFSStripedInputStream(fs.getClient(), filePath.toString(),
false, schema, CELLSIZE);
false, schema, CELLSIZE, null);

byte[] expected = new byte[fileSize];

Expand Down
Expand Up @@ -110,7 +110,7 @@ public void testLocatedBlocks2Locations() {
l2.setCorrupt(true);

List<LocatedBlock> ls = Arrays.asList(l1, l2);
LocatedBlocks lbs = new LocatedBlocks(10, false, ls, l2, true, null);
LocatedBlocks lbs = new LocatedBlocks(10, false, ls, l2, true, null, null, 0);

BlockLocation[] bs = DFSUtilClient.locatedBlocks2Locations(lbs);

Expand Down

0 comments on commit 0b7af27

Please sign in to comment.