Skip to content

Commit

Permalink
HDFS-11382. Persist Erasure Coding Policy ID in a new optional field …
Browse files Browse the repository at this point in the history
…in INodeFile in FSImage. Contributed by Manoj Govindassamy.
  • Loading branch information
umbrant committed Feb 28, 2017
1 parent 5f5b031 commit 55c07bb
Show file tree
Hide file tree
Showing 17 changed files with 313 additions and 72 deletions.
Expand Up @@ -79,7 +79,7 @@ public HdfsFileStatus(long length, boolean isdir, int block_replication,
byte storagePolicy, ErasureCodingPolicy ecPolicy) {
this.length = length;
this.isdir = isdir;
this.block_replication = ecPolicy == null ? (short) block_replication : 0;
this.block_replication = (short) block_replication;
this.blocksize = blocksize;
this.modification_time = modification_time;
this.access_time = access_time;
Expand Down
Expand Up @@ -234,4 +234,10 @@
<Method name="assertAllResultsEqual" />
<Bug pattern="NP_LOAD_OF_KNOWN_NULL_VALUE" />
</Match>
<!-- Manually verified that signed byte value involved in bitwise OR is not negative -->
<Match>
<Class name="org.apache.hadoop.hdfs.server.namenode.INodeFile$HeaderFormat" />
<Method name="getBlockLayoutRedundancy" />
<Bug pattern="BIT_IOR_OF_SIGNED_BYTE" />
</Match>
</FindBugsFilter>
Expand Up @@ -416,21 +416,25 @@ static INodeFile addFileForEditLog(
assert fsd.hasWriteLock();
try {
// check if the file has an EC policy
boolean isStriped = false;
ErasureCodingPolicy ecPolicy = FSDirErasureCodingOp.
getErasureCodingPolicy(fsd.getFSNamesystem(), existing);
if (ecPolicy != null) {
replication = ecPolicy.getId();
isStriped = true;
}
final BlockType blockType = ecPolicy != null?
final BlockType blockType = isStriped ?
BlockType.STRIPED : BlockType.CONTIGUOUS;
final Short replicationFactor = (!isStriped ? replication : null);
final Byte ecPolicyID = (isStriped ? ecPolicy.getId() : null);
if (underConstruction) {
newNode = newINodeFile(id, permissions, modificationTime,
modificationTime, replication, preferredBlockSize, storagePolicyId,
blockType);
modificationTime, replicationFactor, ecPolicyID, preferredBlockSize,
storagePolicyId, blockType);
newNode.toUnderConstruction(clientName, clientMachine);
} else {
newNode = newINodeFile(id, permissions, modificationTime, atime,
replication, preferredBlockSize, storagePolicyId, blockType);
replicationFactor, ecPolicyID, preferredBlockSize,
storagePolicyId, blockType);
}
newNode.setLocalName(localName);
INodesInPath iip = fsd.addINode(existing, newNode,
Expand Down Expand Up @@ -523,15 +527,19 @@ private static INodesInPath addFile(
INodesInPath newiip;
fsd.writeLock();
try {
boolean isStriped = false;
ErasureCodingPolicy ecPolicy = FSDirErasureCodingOp.
getErasureCodingPolicy(fsd.getFSNamesystem(), existing);
if (ecPolicy != null) {
replication = ecPolicy.getId();
isStriped = true;
}
final BlockType blockType = ecPolicy != null?
final BlockType blockType = isStriped ?
BlockType.STRIPED : BlockType.CONTIGUOUS;
final Short replicationFactor = (!isStriped ? replication : null);
final Byte ecPolicyID = (isStriped ? ecPolicy.getId() : null);
INodeFile newNode = newINodeFile(fsd.allocateNewInodeId(), permissions,
modTime, modTime, replication, preferredBlockSize, blockType);
modTime, modTime, replicationFactor, ecPolicyID, preferredBlockSize,
blockType);
newNode.setLocalName(localName);
newNode.toUnderConstruction(clientName, clientMachine);
newiip = fsd.addINode(existing, newNode, permissions.getPermission());
Expand Down Expand Up @@ -702,17 +710,17 @@ private static boolean completeFileInternal(

private static INodeFile newINodeFile(
long id, PermissionStatus permissions, long mtime, long atime,
short replication, long preferredBlockSize, byte storagePolicyId,
BlockType blockType) {
Short replication, Byte ecPolicyID, long preferredBlockSize,
byte storagePolicyId, BlockType blockType) {
return new INodeFile(id, null, permissions, mtime, atime,
BlockInfo.EMPTY_ARRAY, replication, preferredBlockSize,
BlockInfo.EMPTY_ARRAY, replication, ecPolicyID, preferredBlockSize,
storagePolicyId, blockType);
}

private static INodeFile newINodeFile(long id, PermissionStatus permissions,
long mtime, long atime, short replication, long preferredBlockSize,
BlockType blockType) {
return newINodeFile(id, permissions, mtime, atime, replication,
long mtime, long atime, Short replication, Byte ecPolicyID,
long preferredBlockSize, BlockType blockType) {
return newINodeFile(id, permissions, mtime, atime, replication, ecPolicyID,
preferredBlockSize, (byte)0, blockType);
}

Expand Down
Expand Up @@ -896,9 +896,9 @@ public INodeFileAttributes loadINodeFileAttributes(DataInput in)
in.readShort());
final long preferredBlockSize = in.readLong();

return new INodeFileAttributes.SnapshotCopy(name, permissions, null, modificationTime,
accessTime, replication, preferredBlockSize, (byte) 0, null,
BlockType.CONTIGUOUS);
return new INodeFileAttributes.SnapshotCopy(name, permissions, null,
modificationTime, accessTime, replication, null, preferredBlockSize,
(byte) 0, null, BlockType.CONTIGUOUS);
}

public INodeDirectoryAttributes loadINodeDirectoryAttributes(DataInput in)
Expand Down
Expand Up @@ -329,17 +329,19 @@ private INodeFile loadINodeFile(INodeSection.INode n) {
assert n.getType() == INodeSection.INode.Type.FILE;
INodeSection.INodeFile f = n.getFile();
List<BlockProto> bp = f.getBlocksList();
short replication = (short) f.getReplication();
BlockType blockType = PBHelperClient.convert(f.getBlockType());
LoaderContext state = parent.getLoaderContext();
ErasureCodingPolicy ecPolicy = (blockType == BlockType.STRIPED) ?
ErasureCodingPolicyManager.getPolicyByPolicyID((byte) replication) :
null;
boolean isStriped = f.hasErasureCodingPolicyID();
Short replication = (!isStriped ? (short) f.getReplication() : null);
ErasureCodingPolicy ecPolicy = isStriped ?
ErasureCodingPolicyManager.getPolicyByPolicyID(
(byte) f.getErasureCodingPolicyID()) : null;
Byte ecPolicyID = (isStriped ? ecPolicy.getId() : null);

BlockInfo[] blocks = new BlockInfo[bp.size()];
for (int i = 0; i < bp.size(); ++i) {
BlockProto b = bp.get(i);
if (blockType == BlockType.STRIPED) {
if (isStriped) {
blocks[i] = new BlockInfoStriped(PBHelperClient.convert(b), ecPolicy);
} else {
blocks[i] = new BlockInfoContiguous(PBHelperClient.convert(b),
Expand All @@ -352,8 +354,8 @@ private INodeFile loadINodeFile(INodeSection.INode n) {

final INodeFile file = new INodeFile(n.getId(),
n.getName().toByteArray(), permissions, f.getModificationTime(),
f.getAccessTime(), blocks, replication, f.getPreferredBlockSize(),
(byte)f.getStoragePolicyID(), blockType);
f.getAccessTime(), blocks, replication, ecPolicyID,
f.getPreferredBlockSize(), (byte)f.getStoragePolicyID(), blockType);

if (f.hasAcl()) {
int[] entries = AclEntryStatusFormat.toInt(loadAclEntries(
Expand All @@ -376,7 +378,7 @@ private INodeFile loadINodeFile(INodeSection.INode n) {
BlockInfo lastBlk = file.getLastBlock();
// replace the last block of file
final BlockInfo ucBlk;
if (blockType == BlockType.STRIPED) {
if (isStriped) {
BlockInfoStriped striped = (BlockInfoStriped) lastBlk;
ucBlk = new BlockInfoStriped(striped, ecPolicy);
} else {
Expand Down Expand Up @@ -503,10 +505,15 @@ public static INodeSection.INodeFile.Builder buildINodeFile(
.setModificationTime(file.getModificationTime())
.setPermission(buildPermissionStatus(file, state.getStringMap()))
.setPreferredBlockSize(file.getPreferredBlockSize())
.setReplication(file.getFileReplication())
.setStoragePolicyID(file.getLocalStoragePolicyID())
.setBlockType(PBHelperClient.convert(file.getBlockType()));

if (file.isStriped()) {
b.setErasureCodingPolicyID(file.getErasureCodingPolicyID());
} else {
b.setReplication(file.getFileReplication());
}

AclFeature f = file.getAclFeature();
if (f != null) {
b.setAcl(buildAclEntries(f, state.getStringMap()));
Expand Down
Expand Up @@ -62,6 +62,11 @@
public class INodeFile extends INodeWithAdditionalFields
implements INodeFileAttributes, BlockCollection {

/**
* Erasure Coded striped blocks have replication factor of 1.
*/
public static final short DEFAULT_REPL_FOR_STRIPED_BLOCKS = 1;

/** The same as valueOf(inode, path, false). */
public static INodeFile valueOf(INode inode, String path
) throws FileNotFoundException {
Expand Down Expand Up @@ -126,16 +131,20 @@ enum HeaderFormat {
* Different types can be replica or EC
*/
private static final int LAYOUT_BIT_WIDTH = 1;

private static final int MAX_REDUNDANCY = (1 << 11) - 1;

HeaderFormat(LongBitFormat previous, int length, long min) {
BITS = new LongBitFormat(name(), previous, length, min);
}

static short getReplication(long header) {
long layoutRedundancy = BLOCK_LAYOUT_AND_REDUNDANCY.BITS.retrieve(header);
return (short) (layoutRedundancy & MAX_REDUNDANCY);
if (isStriped(header)) {
return DEFAULT_REPL_FOR_STRIPED_BLOCKS;
} else {
long layoutRedundancy =
BLOCK_LAYOUT_AND_REDUNDANCY.BITS.retrieve(header);
return (short) (layoutRedundancy & MAX_REDUNDANCY);
}
}

static byte getECPolicyID(long header) {
Expand All @@ -158,8 +167,7 @@ static byte getStoragePolicyID(long header) {
static final long BLOCK_TYPE_MASK_STRIPED = 1 << 11;

static boolean isStriped(long header) {
long layoutRedundancy = BLOCK_LAYOUT_AND_REDUNDANCY.BITS.retrieve(header);
return (layoutRedundancy & BLOCK_TYPE_MASK) != 0;
return getBlockType(header) == STRIPED;
}

static BlockType getBlockType(long header) {
Expand All @@ -172,22 +180,40 @@ static BlockType getBlockType(long header) {
}
}

static long toLong(long preferredBlockSize, short replication,
BlockType blockType, byte storagePolicyID) {
Preconditions.checkArgument(replication >= 0 &&
replication <= MAX_REDUNDANCY);
/**
* Construct block layout redundancy based on the given BlockType,
* replication factor and EC PolicyID.
*/
static long getBlockLayoutRedundancy(final BlockType blockType,
final Short replication, final Byte erasureCodingPolicyID) {
long layoutRedundancy = 0;
if (blockType == STRIPED) {
Preconditions.checkArgument(replication == null &&
erasureCodingPolicyID != null);
Preconditions.checkArgument(
ErasureCodingPolicyManager.getPolicyByPolicyID(
erasureCodingPolicyID) != null);
layoutRedundancy |= BLOCK_TYPE_MASK_STRIPED;
// Following bitwise OR with signed byte erasureCodingPolicyID is safe
// as the PolicyID can never be in negative.
layoutRedundancy |= erasureCodingPolicyID;
} else {
Preconditions.checkArgument(replication != null &&
erasureCodingPolicyID == null);
Preconditions.checkArgument(replication >= 0 &&
replication <= MAX_REDUNDANCY);
layoutRedundancy |= replication;
}
return layoutRedundancy;
}

static long toLong(long preferredBlockSize, long layoutRedundancy,
byte storagePolicyID) {
long h = 0;
if (preferredBlockSize == 0) {
preferredBlockSize = PREFERRED_BLOCK_SIZE.BITS.getMin();
}
h = PREFERRED_BLOCK_SIZE.BITS.combine(preferredBlockSize, h);
// For erasure coded files, replication is used to store ec policy id
// TODO: this is hacky. Add some utility to generate the layoutRedundancy
long layoutRedundancy = 0;
if (blockType == STRIPED) {
layoutRedundancy |= BLOCK_TYPE_MASK_STRIPED;
}
layoutRedundancy |= replication;
h = BLOCK_LAYOUT_AND_REDUNDANCY.BITS.combine(layoutRedundancy, h);
h = STORAGE_POLICY_ID.BITS.combine(storagePolicyID, h);
return h;
Expand All @@ -202,15 +228,17 @@ static long toLong(long preferredBlockSize, short replication,
INodeFile(long id, byte[] name, PermissionStatus permissions, long mtime,
long atime, BlockInfo[] blklist, short replication,
long preferredBlockSize) {
this(id, name, permissions, mtime, atime, blklist, replication,
this(id, name, permissions, mtime, atime, blklist, replication, null,
preferredBlockSize, (byte) 0, CONTIGUOUS);
}

INodeFile(long id, byte[] name, PermissionStatus permissions, long mtime,
long atime, BlockInfo[] blklist, short replication,
long atime, BlockInfo[] blklist, Short replication, Byte ecPolicyID,
long preferredBlockSize, byte storagePolicyID, BlockType blockType) {
super(id, name, permissions, mtime, atime);
header = HeaderFormat.toLong(preferredBlockSize, replication, blockType,
final long layoutRedundancy = HeaderFormat.getBlockLayoutRedundancy(
blockType, replication, ecPolicyID);
header = HeaderFormat.toLong(preferredBlockSize, layoutRedundancy,
storagePolicyID);
if (blklist != null && blklist.length > 0) {
for (BlockInfo b : blklist) {
Expand Down Expand Up @@ -462,6 +490,9 @@ public final short getFileReplication(int snapshot) {
* */
@Override // INodeFileAttributes
public final short getFileReplication() {
if (isStriped()) {
return DEFAULT_REPL_FOR_STRIPED_BLOCKS;
}
return getFileReplication(CURRENT_STATE_ID);
}

Expand Down
Expand Up @@ -56,11 +56,13 @@ static class SnapshotCopy extends INodeAttributes.SnapshotCopy

public SnapshotCopy(byte[] name, PermissionStatus permissions,
AclFeature aclFeature, long modificationTime, long accessTime,
short replication, long preferredBlockSize,
Short replication, Byte ecPolicyID, long preferredBlockSize,
byte storagePolicyID, XAttrFeature xAttrsFeature, BlockType blockType) {
super(name, permissions, aclFeature, modificationTime, accessTime,
xAttrsFeature);
header = HeaderFormat.toLong(preferredBlockSize, replication, blockType,
final long layoutRedundancy = HeaderFormat.getBlockLayoutRedundancy(
blockType, replication, ecPolicyID);
header = HeaderFormat.toLong(preferredBlockSize, layoutRedundancy,
storagePolicyID);
}

Expand Down
Expand Up @@ -41,6 +41,7 @@
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockTypeProto;
import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous;
Expand Down Expand Up @@ -232,9 +233,15 @@ private void loadFileDiffList(InputStream in, INodeFile file, int size)
fileInPb.getXAttrs(), state.getStringTable()));
}

boolean isStriped =
(fileInPb.getBlockType() == BlockTypeProto .STRIPED);
Short replication =
(!isStriped ? (short)fileInPb.getReplication() : null);
Byte ecPolicyID =
(isStriped ? (byte)fileInPb.getErasureCodingPolicyID() : null);
copy = new INodeFileAttributes.SnapshotCopy(pbf.getName()
.toByteArray(), permission, acl, fileInPb.getModificationTime(),
fileInPb.getAccessTime(), (short) fileInPb.getReplication(),
fileInPb.getAccessTime(), replication, ecPolicyID,
fileInPb.getPreferredBlockSize(),
(byte)fileInPb.getStoragePolicyID(), xAttrs,
PBHelperClient.convert(fileInPb.getBlockType()));
Expand Down
Expand Up @@ -49,6 +49,7 @@
import org.apache.hadoop.hdfs.server.namenode.FSImageUtil;
import org.apache.hadoop.hdfs.server.namenode.FsImageProto;
import org.apache.hadoop.hdfs.server.namenode.FsImageProto.INodeSection.INode;
import org.apache.hadoop.hdfs.server.namenode.INodeFile;
import org.apache.hadoop.hdfs.server.namenode.INodeId;
import org.apache.hadoop.hdfs.web.JsonUtil;
import org.apache.hadoop.hdfs.web.resources.XAttrEncodingParam;
Expand Down Expand Up @@ -589,7 +590,11 @@ private long lookup(String path) throws IOException {
map.put("pathSuffix",
printSuffix ? inode.getName().toStringUtf8() : "");
map.put("permission", toString(p.getPermission()));
map.put("replication", f.getReplication());
if (f.hasErasureCodingPolicyID()) {
map.put("replication", INodeFile.DEFAULT_REPL_FOR_STRIPED_BLOCKS);
} else {
map.put("replication", f.getReplication());
}
map.put("type", inode.getType());
map.put("fileId", inode.getId());
map.put("childrenNum", 0);
Expand Down
Expand Up @@ -496,9 +496,7 @@ private void dumpINodeFile(INodeSection.INodeFile f) {
o(INODE_SECTION_STORAGE_POLICY_ID, f.getStoragePolicyID());
}
if (f.getBlockType() != BlockTypeProto.CONTIGUOUS) {
out.print("<" + INODE_SECTION_BLOCK_TYPE + ">");
o(SECTION_NAME, f.getBlockType().name());
out.print("</" + INODE_SECTION_BLOCK_TYPE + ">\n");
o(INODE_SECTION_BLOCK_TYPE, f.getBlockType().name());
}

if (f.hasFileUC()) {
Expand Down
Expand Up @@ -141,6 +141,7 @@ message INodeSection {
optional XAttrFeatureProto xAttrs = 9;
optional uint32 storagePolicyID = 10;
optional BlockTypeProto blockType = 11;
optional uint32 erasureCodingPolicyID = 12;
}

message QuotaByStorageTypeEntryProto {
Expand Down

0 comments on commit 55c07bb

Please sign in to comment.