Skip to content

Commit

Permalink
HDFS-8375. Add cellSize as an XAttr to ECZone. Contributed by Vinayak…
Browse files Browse the repository at this point in the history
…umar B.
  • Loading branch information
zhe-thoughts authored and Zhe Zhang committed May 26, 2015
1 parent 9145809 commit 91c81fd
Show file tree
Hide file tree
Showing 48 changed files with 244 additions and 174 deletions.
Expand Up @@ -49,7 +49,8 @@ public class HdfsFileStatus {

private final FileEncryptionInfo feInfo;

private final ECSchema schema;
private final ECSchema ecSchema;
private final int stripeCellSize;

// Used by dir, not including dot and dotdot. Always zero for a regular file.
private final int childrenNum;
Expand All @@ -76,7 +77,7 @@ public HdfsFileStatus(long length, boolean isdir, int block_replication,
long blocksize, long modification_time, long access_time,
FsPermission permission, String owner, String group, byte[] symlink,
byte[] path, long fileId, int childrenNum, FileEncryptionInfo feInfo,
byte storagePolicy, ECSchema schema) {
byte storagePolicy, ECSchema ecSchema, int stripeCellSize) {
this.length = length;
this.isdir = isdir;
this.block_replication = (short)block_replication;
Expand All @@ -96,7 +97,8 @@ public HdfsFileStatus(long length, boolean isdir, int block_replication,
this.childrenNum = childrenNum;
this.feInfo = feInfo;
this.storagePolicy = storagePolicy;
this.schema = schema;
this.ecSchema = ecSchema;
this.stripeCellSize = stripeCellSize;
}

/**
Expand Down Expand Up @@ -255,7 +257,11 @@ public final FileEncryptionInfo getFileEncryptionInfo() {
}

public ECSchema getECSchema() {
return schema;
return ecSchema;
}

public int getStripeCellSize() {
return stripeCellSize;
}

public final int getChildrenNum() {
Expand Down
Expand Up @@ -61,7 +61,7 @@ public SnapshottableDirectoryStatus(long modification_time, long access_time,
int snapshotNumber, int snapshotQuota, byte[] parentFullPath) {
this.dirStatus = new HdfsFileStatus(0, true, 0, 0, modification_time,
access_time, permission, owner, group, null, localName, inodeId,
childrenNum, null, HdfsConstants.BLOCK_STORAGE_POLICY_ID_UNSPECIFIED, null);
childrenNum, null, HdfsConstants.BLOCK_STORAGE_POLICY_ID_UNSPECIFIED, null, 0);
this.snapshotNumber = snapshotNumber;
this.snapshotQuota = snapshotQuota;
this.parentFullPath = parentFullPath;
Expand Down
Expand Up @@ -132,7 +132,7 @@ static HdfsFileStatus toFileStatus(final Map<?, ?> json, boolean includesType) {
blockSize, mTime, aTime, permission, owner, group,
symlink, DFSUtilClient.string2Bytes(localName),
fileId, childrenNum, null,
storagePolicy, null);
storagePolicy, null, 0);
}

/** Convert a Json map to an ExtendedBlock object. */
Expand Down
8 changes: 5 additions & 3 deletions hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
Expand Up @@ -227,9 +227,11 @@
(Yi Liu via jing9)

HDFS-8320. Erasure coding: consolidate striping-related terminologies. (zhz)
HDFS-8366. Erasure Coding: Make the timeout parameter of polling blocking queue
configurable in DFSStripedOutputStream. (Li Bo)

HDFS-8366. Erasure Coding: Make the timeout parameter of polling blocking queue
configurable in DFSStripedOutputStream. (Li Bo)

HDFS-8378. Erasure Coding: Few improvements for the erasure coding worker.
(Rakesh R via waltersu4549)

HDFS-8375. Add cellSize as an XAttr to ECZone. ( Vinayakumar B via zhz).
Expand Up @@ -1197,7 +1197,8 @@ public DFSInputStream open(String src, int buffersize, boolean verifyChecksum)
if (fileInfo != null) {
ECSchema schema = fileInfo.getECSchema();
if (schema != null) {
return new DFSStripedInputStream(this, src, verifyChecksum, schema);
return new DFSStripedInputStream(this, src, verifyChecksum, schema,
fileInfo.getStripeCellSize());
}
}
return new DFSInputStream(this, src, verifyChecksum);
Expand Down Expand Up @@ -3009,12 +3010,12 @@ public RemoteIterator<EncryptionZone> listEncryptionZones()
return new EncryptionZoneIterator(namenode, traceSampler);
}

public void createErasureCodingZone(String src, ECSchema schema)
public void createErasureCodingZone(String src, ECSchema schema, int cellSize)
throws IOException {
checkOpen();
TraceScope scope = getPathTraceScope("createErasureCodingZone", src);
try {
namenode.createErasureCodingZone(src, schema);
namenode.createErasureCodingZone(src, schema, cellSize);
} catch (RemoteException re) {
throw re.unwrapRemoteException(AccessControlException.class,
SafeModeException.class,
Expand Down
Expand Up @@ -125,12 +125,12 @@ boolean include(long pos) {
private final CompletionService<Integer> readingService;

DFSStripedInputStream(DFSClient dfsClient, String src, boolean verifyChecksum,
ECSchema schema) throws IOException {
ECSchema schema, int cellSize) throws IOException {
super(dfsClient, src, verifyChecksum);

assert schema != null;
this.schema = schema;
cellSize = schema.getChunkSize();
this.cellSize = cellSize;
dataBlkNum = (short) schema.getNumDataUnits();
parityBlkNum = (short) schema.getNumParityUnits();
groupSize = dataBlkNum;
Expand Down Expand Up @@ -189,7 +189,7 @@ private synchronized void blockSeekTo(long target) throws IOException {
targetBlockGroup, cellSize, dataBlkNum, parityBlkNum);
// The purpose is to get start offset into each block.
long[] offsetsForInternalBlocks = getStartOffsetsForInternalBlocks(schema,
targetBlockGroup, offsetIntoBlockGroup);
cellSize, targetBlockGroup, offsetIntoBlockGroup);
Preconditions.checkNotNull(offsetsForInternalBlocks);

final ReaderRetryPolicy retry = new ReaderRetryPolicy();
Expand Down Expand Up @@ -514,8 +514,8 @@ protected void fetchBlockByteRange(long blockStartOffset, long start,
// Refresh the striped block group
LocatedStripedBlock blockGroup = getBlockGroupAt(blockStartOffset);

AlignedStripe[] stripes = divideByteRangeIntoStripes(schema, blockGroup,
start, end, buf, offset);
AlignedStripe[] stripes = divideByteRangeIntoStripes(schema, cellSize,
blockGroup, start, end, buf, offset);
for (AlignedStripe stripe : stripes) {
fetchOneStripe(blockGroup, buf, stripe, corruptedBlockMap);
}
Expand Down
Expand Up @@ -230,7 +230,7 @@ private StripedDataStreamer getLeadingStreamer() {

final ECSchema schema = stat.getECSchema();
final int numParityBlocks = schema.getNumParityUnits();
cellSize = schema.getChunkSize();
cellSize = stat.getStripeCellSize();
numDataBlocks = schema.getNumDataUnits();
numAllBlocks = numDataBlocks + numParityBlocks;

Expand Down
Expand Up @@ -2281,24 +2281,25 @@ public DFSInotifyEventInputStream getInotifyEventStream(long lastReadTxid)
*
* @param path Directory to create the ec zone
* @param schema ECSchema for the zone. If not specified default will be used.
* @param cellSize Cellsize for the striped erasure coding
* @throws IOException
*/
public void createErasureCodingZone(final Path path, final ECSchema schema)
throws IOException {
public void createErasureCodingZone(final Path path, final ECSchema schema,
final int cellSize) throws IOException {
Path absF = fixRelativePart(path);
new FileSystemLinkResolver<Void>() {
@Override
public Void doCall(final Path p) throws IOException,
UnresolvedLinkException {
dfs.createErasureCodingZone(getPathName(p), schema);
dfs.createErasureCodingZone(getPathName(p), schema, cellSize);
return null;
}

@Override
public Void next(final FileSystem fs, final Path p) throws IOException {
if (fs instanceof DistributedFileSystem) {
DistributedFileSystem myDfs = (DistributedFileSystem) fs;
myDfs.createErasureCodingZone(p, schema);
myDfs.createErasureCodingZone(p, schema, cellSize);
return null;
}
throw new UnsupportedOperationException(
Expand Down
Expand Up @@ -1463,7 +1463,7 @@ public List<XAttr> listXAttrs(String src)
* default
*/
@AtMostOnce
public void createErasureCodingZone(String src, ECSchema schema)
public void createErasureCodingZone(String src, ECSchema schema, int cellSize)
throws IOException;

/**
Expand Down
Expand Up @@ -25,10 +25,12 @@ public class ErasureCodingZoneInfo {

private String dir;
private ECSchema schema;
private int cellSize;

public ErasureCodingZoneInfo(String dir, ECSchema schema) {
public ErasureCodingZoneInfo(String dir, ECSchema schema, int cellSize) {
this.dir = dir;
this.schema = schema;
this.cellSize = cellSize;
}

/**
Expand All @@ -49,8 +51,16 @@ public ECSchema getSchema() {
return schema;
}

/**
* Get cellSize for the EC Zone
*/
public int getCellSize() {
return cellSize;
}

@Override
public String toString() {
return "Dir: " + getDir() + ", Schema: " + schema;
return "Dir: " + getDir() + ", Schema: " + schema + ", cellSize: "
+ cellSize;
}
}
Expand Up @@ -59,10 +59,11 @@ public HdfsLocatedFileStatus(long length, boolean isdir,
int block_replication, long blocksize, long modification_time,
long access_time, FsPermission permission, String owner, String group,
byte[] symlink, byte[] path, long fileId, LocatedBlocks locations,
int childrenNum, FileEncryptionInfo feInfo, byte storagePolicy, ECSchema schema) {
int childrenNum, FileEncryptionInfo feInfo, byte storagePolicy,
ECSchema schema, int stripeCellSize) {
super(length, isdir, block_replication, blocksize, modification_time,
access_time, permission, owner, group, symlink, path, fileId,
childrenNum, feInfo, storagePolicy, schema);
childrenNum, feInfo, storagePolicy, schema, stripeCellSize);
this.locations = locations;
}

Expand Down
Expand Up @@ -1408,7 +1408,8 @@ public CreateErasureCodingZoneResponseProto createErasureCodingZone(
try {
ECSchema schema = req.hasSchema() ? PBHelper.convertECSchema(req
.getSchema()) : null;
server.createErasureCodingZone(req.getSrc(), schema);
int cellSize = req.hasCellSize() ? req.getCellSize() : 0;
server.createErasureCodingZone(req.getSrc(), schema, cellSize);
return CreateErasureCodingZoneResponseProto.newBuilder().build();
} catch (IOException e) {
throw new ServiceException(e);
Expand Down
Expand Up @@ -1422,14 +1422,17 @@ public BatchedEntries<EncryptionZone> listEncryptionZones(long id)
}

@Override
public void createErasureCodingZone(String src, ECSchema schema)
public void createErasureCodingZone(String src, ECSchema schema, int cellSize)
throws IOException {
final CreateErasureCodingZoneRequestProto.Builder builder =
CreateErasureCodingZoneRequestProto.newBuilder();
builder.setSrc(src);
if (schema != null) {
builder.setSchema(PBHelper.convertECSchema(schema));
}
if (cellSize > 0) {
builder.setCellSize(cellSize);
}
CreateErasureCodingZoneRequestProto req = builder.build();
try {
rpcProxy.createErasureCodingZone(null, req);
Expand Down
Expand Up @@ -1506,7 +1506,8 @@ public static HdfsFileStatus convert(HdfsFileStatusProto fs) {
fs.hasFileEncryptionInfo() ? convert(fs.getFileEncryptionInfo()) : null,
fs.hasStoragePolicy() ? (byte) fs.getStoragePolicy()
: HdfsConstants.BLOCK_STORAGE_POLICY_ID_UNSPECIFIED,
fs.hasEcSchema() ? PBHelper.convertECSchema(fs.getEcSchema()) : null);
fs.hasEcSchema() ? PBHelper.convertECSchema(fs.getEcSchema()) : null,
fs.hasStripeCellSize() ? fs.getStripeCellSize() : 0);
}

public static SnapshottableDirectoryStatus convert(
Expand Down Expand Up @@ -1570,6 +1571,7 @@ public static HdfsFileStatusProto convert(HdfsFileStatus fs) {
if(fs.getECSchema() != null) {
builder.setEcSchema(PBHelper.convertECSchema(fs.getECSchema()));
}
builder.setStripeCellSize(fs.getStripeCellSize());
return builder.build();
}

Expand Down Expand Up @@ -3157,12 +3159,14 @@ public static ECSchemaProto convertECSchema(ECSchema schema) {

public static ErasureCodingZoneInfoProto convertECZoneInfo(ErasureCodingZoneInfo ecZoneInfo) {
return ErasureCodingZoneInfoProto.newBuilder().setDir(ecZoneInfo.getDir())
.setSchema(convertECSchema(ecZoneInfo.getSchema())).build();
.setSchema(convertECSchema(ecZoneInfo.getSchema()))
.setCellSize(ecZoneInfo.getCellSize()).build();
}

public static ErasureCodingZoneInfo convertECZoneInfo(ErasureCodingZoneInfoProto ecZoneInfoProto) {
return new ErasureCodingZoneInfo(ecZoneInfoProto.getDir(),
convertECSchema(ecZoneInfoProto.getSchema()));
convertECSchema(ecZoneInfoProto.getSchema()),
ecZoneInfoProto.getCellSize());
}

public static BlockECRecoveryInfo convertBlockECRecoveryInfo(
Expand Down Expand Up @@ -3196,9 +3200,11 @@ public static BlockECRecoveryInfo convertBlockECRecoveryInfo(
}

ECSchema ecSchema = convertECSchema(blockEcRecoveryInfoProto.getEcSchema());
int cellSize = blockEcRecoveryInfoProto.getCellSize();

return new BlockECRecoveryInfo(block, sourceDnInfos, targetDnInfos,
targetStorageUuids, convertStorageTypes, liveBlkIndices, ecSchema);
targetStorageUuids, convertStorageTypes, liveBlkIndices, ecSchema,
cellSize);
}

public static BlockECRecoveryInfoProto convertBlockECRecoveryInfo(
Expand All @@ -3224,6 +3230,7 @@ public static BlockECRecoveryInfoProto convertBlockECRecoveryInfo(
builder.addAllLiveBlockIndices(convertIntArray(liveBlockIndices));

builder.setEcSchema(convertECSchema(blockEcRecoveryInfo.getECSchema()));
builder.setCellSize(blockEcRecoveryInfo.getCellSize());

return builder.build();
}
Expand Down
Expand Up @@ -51,6 +51,7 @@
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportReplica;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ErasureCodingZoneInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.fs.FileEncryptionInfo;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
Expand Down Expand Up @@ -1560,14 +1561,14 @@ int computeRecoveryWorkForBlocks(List<List<BlockInfo>> blocksToRecover) {
assert rw instanceof ErasureCodingWork;
assert rw.targets.length > 0;
String src = block.getBlockCollection().getName();
ECSchema ecSchema = null;
ErasureCodingZoneInfo ecZoneInfo = null;
try {
ecSchema = namesystem.getECSchemaForPath(src);
ecZoneInfo = namesystem.getErasureCodingZoneInfoForPath(src);
} catch (IOException e) {
blockLog
.warn("Failed to get the EC schema for the file {} ", src);
.warn("Failed to get the EC zone info for the file {} ", src);
}
if (ecSchema == null) {
if (ecZoneInfo == null) {
blockLog.warn("No EC schema found for the file {}. "
+ "So cannot proceed for recovery", src);
// TODO: we may have to revisit later for what we can do better to
Expand All @@ -1577,7 +1578,8 @@ int computeRecoveryWorkForBlocks(List<List<BlockInfo>> blocksToRecover) {
rw.targets[0].getDatanodeDescriptor().addBlockToBeErasureCoded(
new ExtendedBlock(namesystem.getBlockPoolId(), block),
rw.srcNodes, rw.targets,
((ErasureCodingWork) rw).liveBlockIndicies, ecSchema);
((ErasureCodingWork) rw).liveBlockIndicies,
ecZoneInfo.getSchema(), ecZoneInfo.getCellSize());
} else {
rw.srcNodes[0].addBlockToBeReplicated(block, targets);
}
Expand Down
Expand Up @@ -610,10 +610,10 @@ void addBlockToBeReplicated(Block block, DatanodeStorageInfo[] targets) {
*/
void addBlockToBeErasureCoded(ExtendedBlock block,
DatanodeDescriptor[] sources, DatanodeStorageInfo[] targets,
short[] liveBlockIndices, ECSchema ecSchema) {
short[] liveBlockIndices, ECSchema ecSchema, int cellSize) {
assert (block != null && sources != null && sources.length > 0);
BlockECRecoveryInfo task = new BlockECRecoveryInfo(block, sources, targets,
liveBlockIndices, ecSchema);
liveBlockIndices, ecSchema, cellSize);
erasurecodeBlocks.offer(task);
BlockManager.LOG.debug("Adding block recovery task " + task + "to "
+ getName() + ", current queue size is " + erasurecodeBlocks.size());
Expand Down
Expand Up @@ -269,7 +269,7 @@ private class ReconstructAndTransferBlock implements Runnable {
ECSchema schema = recoveryInfo.getECSchema();
dataBlkNum = schema.getNumDataUnits();
parityBlkNum = schema.getNumParityUnits();
cellSize = schema.getChunkSize();
cellSize = recoveryInfo.getCellSize();

blockGroup = recoveryInfo.getExtendedBlock();

Expand Down

0 comments on commit 91c81fd

Please sign in to comment.